[REEF-1130] Worker side VortexAggregateFunction reception This addressed the issue by * Modified and added Avro classes to facilitate submission of Aggregate Functions to VortexWorkers. * Simple aggregation logic on VortexWorkers without AggregationPolicy. * Simple aggregation scenario and test.
JIRA: [REEF-1130](https://issues.apache.org/jira/browse/REEF-1130) Pull request: This closes #788 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/a1f62251 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/a1f62251 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/a1f62251 Branch: refs/heads/master Commit: a1f622519a986a3e2121cfea4f5e06f0410815cb Parents: 86654ac Author: Andrew Chung <[email protected]> Authored: Wed Jan 20 10:46:35 2016 -0800 Committer: Yunseong Lee <[email protected]> Committed: Mon Jan 25 20:56:04 2016 +0800 ---------------------------------------------------------------------- .../src/main/avro/VortexRequest.avsc | 30 +++- .../apache/reef/vortex/api/AggregateResult.java | 24 +-- .../vortex/api/AggregateResultSynchronous.java | 75 ++++++++ .../reef/vortex/api/VortexAggregateFuture.java | 71 ++++---- .../common/AggregateFunctionRepository.java | 71 ++++++++ .../TaskletAggregateExecutionRequest.java | 69 ++++++++ .../common/TaskletAggregationRequest.java | 94 ++++++++++ .../common/TaskletCancellationRequest.java | 4 +- .../vortex/common/TaskletExecutionRequest.java | 3 +- .../reef/vortex/common/VortexAvroUtils.java | 91 ++++++++-- .../reef/vortex/common/VortexRequest.java | 9 +- .../driver/AggregateFunctionRepository.java | 51 ------ .../reef/vortex/driver/DefaultVortexMaster.java | 2 +- .../reef/vortex/driver/RunningWorkers.java | 11 +- .../apache/reef/vortex/driver/VortexDriver.java | 5 +- .../reef/vortex/driver/VortexRequestor.java | 18 +- .../reef/vortex/driver/VortexWorkerManager.java | 44 ++++- .../vortex/evaluator/AggregateContainer.java | 117 +++++++++++++ .../reef/vortex/evaluator/VortexWorker.java | 175 +++++++++++++------ .../sumones/AdditionAggregateFunction.java | 48 +++++ .../examples/sumones/IdentityFunction.java | 48 +++++ .../reef/vortex/examples/sumones/SumOnes.java | 64 +++++++ .../examples/sumones/SumOnesAggregateStart.java | 82 +++++++++ .../vortex/examples/sumones/package-info.java | 22 +++ .../reef/vortex/driver/RunningWorkersTest.java | 1 + .../org/apache/reef/vortex/driver/TestUtil.java | 5 +- .../applications/vortex/addone/AddOneTest.java | 2 +- .../vortex/sumones/SumOnesTest.java | 76 ++++++++ .../vortex/sumones/SumOnesTestStart.java | 85 +++++++++ .../vortex/sumones/package-info.java | 22 +++ 30 files changed, 1222 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc index bf4396e..d0a02fb 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc +++ b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc @@ -20,6 +20,26 @@ { "namespace": "org.apache.reef.vortex.common.avro", "type": "record", + "name": "AvroTaskletAggregateExecutionRequest", + "fields": [ + {"name": "taskletId", "type": "int"}, + {"name": "aggregateFunctionId", "type": "int"}, + {"name": "serializedInput", "type": "bytes"} + ] + }, + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", + "name": "AvroTaskletAggregationRequest", + "fields": [ + {"name": "aggregateFunctionId", "type": "int"}, + {"name": "serializedUserFunction", "type": "bytes"}, + {"name": "serializedAggregateFunction", "type": "bytes"} + ] + }, + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", "name": "AvroTaskletExecutionRequest", "fields": [ {"name": "taskletId", "type": "int"}, @@ -41,11 +61,17 @@ { "name": "requestType", "type": {"type": "enum", "name": "AvroRequestType", - "symbols": ["ExecuteTasklet", "CancelTasklet"]} + "symbols": ["ExecuteTasklet", "CancelTasklet", "Aggregate", "AggregateExecute"]} }, { "name": "taskletRequest", - "type": ["null", "AvroTaskletExecutionRequest", "AvroTaskletCancellationRequest"], + "type": [ + "null", + "AvroTaskletAggregateExecutionRequest", + "AvroTaskletAggregationRequest", + "AvroTaskletExecutionRequest", + "AvroTaskletCancellationRequest" + ], "default": null } ] http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java index 22fb93f..555ef89 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java @@ -27,7 +27,7 @@ import java.util.Collections; import java.util.List; /** - * The result of an aggregate. + * The result of an aggregate. Registered to callbacks for {@link VortexAggregateFuture}. */ @Public @ClientSide @@ -36,28 +36,23 @@ public final class AggregateResult<TInput, TOutput> { private final Optional<TOutput> aggregatedOutput; private final List<TInput> inputList; - private final boolean hasNext; private final Optional<Exception> exception; AggregateResult(final Exception exception, - final List<TInput> inputList, - final boolean hasNext) { - this(Optional.<TOutput>empty(), Optional.of(exception), inputList, hasNext); + final List<TInput> inputList) { + this(Optional.<TOutput>empty(), Optional.of(exception), inputList); } AggregateResult(final TOutput aggregatedOutput, - final List<TInput> inputList, - final boolean hasNext) { - this(Optional.of(aggregatedOutput), Optional.<Exception>empty(), inputList, hasNext); + final List<TInput> inputList) { + this(Optional.of(aggregatedOutput), Optional.<Exception>empty(), inputList); } private AggregateResult(final Optional<TOutput> aggregatedOutput, final Optional<Exception> exception, - final List<TInput> inputList, - final boolean hasNext) { + final List<TInput> inputList) { this.aggregatedOutput = aggregatedOutput; this.inputList = Collections.unmodifiableList(inputList); - this.hasNext = hasNext; this.exception = exception; } @@ -90,11 +85,4 @@ public final class AggregateResult<TInput, TOutput> { public Optional<Exception> getException() { return exception; } - - /** - * @return true if more results will be available, false otherwise. - */ - public boolean hasNext() { - return hasNext; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResultSynchronous.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResultSynchronous.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResultSynchronous.java new file mode 100644 index 0000000..c19bafe --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResultSynchronous.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.util.Optional; + +import java.util.List; + +/** + * The synchronous result of an aggregate, returned by {@link VortexAggregateFuture#get()}. + */ +@Public +@ClientSide +@Unstable +public final class AggregateResultSynchronous<TInput, TOutput> { + private final AggregateResult<TInput, TOutput> result; + private final boolean hasNext; + + AggregateResultSynchronous(final AggregateResult<TInput, TOutput> result, final boolean hasNext) { + this.result = result; + this.hasNext = hasNext; + } + + /** + * @return the output of an aggregation, throws the Exception if a Tasklet or an aggregation fails. + * If an aggregation fails, {@link VortexAggregateException} will be thrown, otherwise + * the Exception that caused the Tasklet to fail will be thrown directly. + * @throws VortexAggregateException the Exception that caused the Tasklet or aggregation failure. + */ + public TOutput getAggregateResult() throws VortexAggregateException { + return result.getAggregateResult(); + } + + /** + * @return the associated inputs of an aggregation + */ + public List<TInput> getAggregatedInputs() { + return result.getAggregatedInputs(); + } + + /** + * If an aggregation fails, {@link VortexAggregateException} will be thrown, otherwise + * the Exception that caused the Tasklet to fail will be thrown directly. + * @return the Exception that caused the Tasklet or aggregation failure, if any. + */ + public Optional<Exception> getException() { + return result.getException(); + } + + /** + * @return true if more results will be available, false otherwise. + */ + public boolean hasNext() { + return hasNext; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java index 23017b7..6e74d22 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java @@ -19,6 +19,8 @@ package org.apache.reef.vortex.api; import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Private; @@ -29,7 +31,6 @@ import org.apache.reef.vortex.common.VortexFutureDelegate; import javax.annotation.concurrent.NotThreadSafe; import java.util.*; import java.util.concurrent.*; -import java.util.logging.Logger; /** * The interface between user code and aggregation Tasklets. @@ -41,12 +42,10 @@ import java.util.logging.Logger; @NotThreadSafe @Unstable public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutureDelegate { - private static final Logger LOG = Logger.getLogger(VortexAggregateFuture.class.getName()); - private final Executor executor; private final Codec<TOutput> aggOutputCodec; - private final BlockingQueue<AggregateResult> resultQueue; - private final Map<Integer, TInput> taskletIdInputMap; + private final BlockingQueue<Pair<List<Integer>, AggregateResult>> resultQueue; + private final ConcurrentMap<Integer, TInput> taskletIdInputMap; private final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler; @Private @@ -55,7 +54,7 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur final Codec<TOutput> aggOutputCodec, final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler) { this.executor = executor; - this.taskletIdInputMap = new HashMap<>(taskletIdInputMap); + this.taskletIdInputMap = new ConcurrentHashMap<>(taskletIdInputMap); this.resultQueue = new ArrayBlockingQueue<>(taskletIdInputMap.size()); this.aggOutputCodec = aggOutputCodec; this.callbackHandler = callbackHandler; @@ -64,12 +63,15 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur /** * @return the next aggregation result for the future, null if no more results. */ - public synchronized AggregateResult get() throws InterruptedException { + public synchronized AggregateResultSynchronous<TInput, TOutput> get() throws InterruptedException { if (taskletIdInputMap.isEmpty()) { return null; } - return resultQueue.take(); + final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.take(); + + removeFromTaskletIdInputMap(resultPair.getLeft()); + return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty()); } /** @@ -78,26 +80,33 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur * @return the next aggregation result for the future, within the user specified timeout, null if no more results. * @throws TimeoutException if time out hits. */ - public synchronized AggregateResult get(final long timeout, - final TimeUnit timeUnit) throws InterruptedException, TimeoutException { + public synchronized AggregateResultSynchronous<TInput, TOutput> get(final long timeout, final TimeUnit timeUnit) + throws InterruptedException, TimeoutException { if (taskletIdInputMap.isEmpty()) { return null; } - final AggregateResult result = resultQueue.poll(timeout, timeUnit); + final Pair<List<Integer>, AggregateResult> resultPair = resultQueue.poll(timeout, timeUnit); - if (result == null) { + if (resultPair == null) { throw new TimeoutException(); } - return result; + removeFromTaskletIdInputMap(resultPair.getLeft()); + return new AggregateResultSynchronous<>(resultPair.getRight(), !taskletIdInputMap.isEmpty()); + } + + private void removeFromTaskletIdInputMap(final List<Integer> taskletIds) { + for (final int taskletId : taskletIds) { + taskletIdInputMap.remove(taskletId); + } } /** * @return true if there are no more results to poll. */ - public synchronized boolean isDone() { - return taskletIdInputMap.size() == 0; + public boolean isDone() { + return taskletIdInputMap.isEmpty(); } /** @@ -109,7 +118,7 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur try { // TODO[REEF-1113]: Handle serialization failure separately in Vortex final TOutput result = aggOutputCodec.decode(serializedResult); - removeCompletedTasklets(result, Collections.singletonList(taskletId)); + completedTasklets(result, Collections.singletonList(taskletId)); } catch (final InterruptedException e) { throw new RuntimeException(e); } @@ -124,7 +133,7 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur try { // TODO[REEF-1113]: Handle serialization failure separately in Vortex final TOutput result = aggOutputCodec.decode(serializedResult); - removeCompletedTasklets(result, taskletIds); + completedTasklets(result, taskletIds); } catch (final InterruptedException e) { throw new RuntimeException(e); } @@ -137,7 +146,7 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur @Override public void threwException(final int taskletId, final Exception exception) { try { - removeFailedTasklets(exception, Collections.singletonList(taskletId)); + failedTasklets(exception, Collections.singletonList(taskletId)); } catch (final InterruptedException e) { throw new RuntimeException(e); } @@ -150,7 +159,7 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur @Override public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) { try { - removeFailedTasklets(exception, taskletIds); + failedTasklets(exception, taskletIds); } catch (final InterruptedException e) { throw new RuntimeException(e); } @@ -166,12 +175,12 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur } /** - * Removes completed Tasklets from Tasklets that are expected and invoke callback. + * Create and queue result for Tasklets that are expected and invoke callback. */ - private synchronized void removeCompletedTasklets(final TOutput output, final List<Integer> taskletIds) + private void completedTasklets(final TOutput output, final List<Integer> taskletIds) throws InterruptedException { - final AggregateResult result = - new AggregateResult(output, getInputs(taskletIds), taskletIdInputMap.size() > 0); + final List<TInput> inputs = getInputs(taskletIds); + final AggregateResult result = new AggregateResult(output, inputs); if (callbackHandler != null) { executor.execute(new Runnable() { @@ -182,41 +191,39 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur }); } - resultQueue.put(result); + resultQueue.put(new ImmutablePair<>(taskletIds, result)); } /** - * Removes failed Tasklets from Tasklets that are expected and invokes callback. + * Create and queue result for failed Tasklets that are expected and invokes callback. */ - private synchronized void removeFailedTasklets(final Exception exception, final List<Integer> taskletIds) + private void failedTasklets(final Exception exception, final List<Integer> taskletIds) throws InterruptedException { final List<TInput> inputs = getInputs(taskletIds); - final AggregateResult failure = - new AggregateResult(exception, inputs, taskletIdInputMap.size() > 0); + final AggregateResult failure = new AggregateResult(exception, inputs); if (callbackHandler != null) { executor.execute(new Runnable() { @Override public void run() { - // TODO[JIRA REEF-1129]: Add documentation in VortexThreadPool. callbackHandler.onFailure(new VortexAggregateException(exception, inputs)); } }); } - resultQueue.put(failure); + resultQueue.put(new ImmutablePair<>(taskletIds, failure)); } /** * Gets the inputs on Tasklet aggregation completion. */ - private synchronized List<TInput> getInputs(final List<Integer> taskletIds) { + private List<TInput> getInputs(final List<Integer> taskletIds) { final List<TInput> inputList = new ArrayList<>(taskletIds.size()); for(final int taskletId : taskletIds) { - inputList.add(taskletIdInputMap.remove(taskletId)); + inputList.add(taskletIdInputMap.get(taskletId)); } return inputList; http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java new file mode 100644 index 0000000..e70ee1a --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.vortex.api.VortexAggregateFunction; +import org.apache.reef.vortex.api.VortexFunction; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * A repository for {@link VortexAggregateFunction} and its associated {@link VortexFunction}, + * used to pass functions between VortexMaster and RunningWorkers, as well as used to cache functions + * for VortexWorkers on AggregateRequests and AggregateExecutionRequests. + */ +@ThreadSafe +@Unstable +@Private +public final class AggregateFunctionRepository { + private final ConcurrentMap<Integer, Pair<VortexAggregateFunction, VortexFunction>> + aggregateFunctionMap = new ConcurrentHashMap<>(); + + @Inject + private AggregateFunctionRepository() { + } + + /** + * Associates an aggregate function ID with a {@link VortexAggregateFunction} and a {@link VortexFunction}. + */ + public Pair<VortexAggregateFunction, VortexFunction> put(final int aggregateFunctionId, + final VortexAggregateFunction aggregateFunction, + final VortexFunction function) { + return aggregateFunctionMap.put(aggregateFunctionId, new ImmutablePair<>(aggregateFunction, function)); + } + + /** + * Gets the {@link VortexAggregateFunction} associated with the aggregate function ID. + */ + public VortexAggregateFunction getAggregateFunction(final int aggregateFunctionId) { + return aggregateFunctionMap.get(aggregateFunctionId).getLeft(); + } + + /** + * Gets the {@link VortexFunction} associated with the aggregate function ID. + */ + public VortexFunction getFunction(final int aggregateFunctionId) { + return aggregateFunctionMap.get(aggregateFunctionId).getRight(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java new file mode 100644 index 0000000..db850fc --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +/** + * A request from the Vortex Driver to run an aggregate-able function. + */ +@Unstable +@Private +@DriverSide +public final class TaskletAggregateExecutionRequest<TInput> implements VortexRequest { + private final TInput input; + private final int aggregateFunctionId; + private final int taskletId; + + public TaskletAggregateExecutionRequest(final int taskletId, + final int aggregateFunctionId, + final TInput input) { + this.taskletId = taskletId; + this.input = input; + this.aggregateFunctionId = aggregateFunctionId; + } + + /** + * @return input of the request. + */ + public TInput getInput() { + return input; + } + + /** + * @return tasklet ID corresponding to the tasklet request. + */ + public int getTaskletId() { + return taskletId; + } + + /** + * @return the AggregateFunctionID of the request. + */ + public int getAggregateFunctionId() { + return aggregateFunctionId; + } + + @Override + public RequestType getType() { + return RequestType.ExecuteAggregateTasklet; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java new file mode 100644 index 0000000..6a7e289 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.vortex.api.VortexAggregateFunction; +import org.apache.reef.vortex.api.VortexFunction; + +import java.util.List; + +/** + * A request from the Vortex Driver for the {@link org.apache.reef.vortex.evaluator.VortexWorker} to + * record aggregate functions for later execution. + */ +@Unstable +@Private +@DriverSide +public final class TaskletAggregationRequest<TInput, TOutput> implements VortexRequest { + private final int aggregateFunctionId; + private final VortexAggregateFunction<TOutput> userAggregateFunction; + private final VortexFunction<TInput, TOutput> function; + + public TaskletAggregationRequest(final int aggregateFunctionId, + final VortexAggregateFunction<TOutput> aggregateFunction, + final VortexFunction<TInput, TOutput> function) { + this.aggregateFunctionId = aggregateFunctionId; + this.userAggregateFunction = aggregateFunction; + this.function = function; + } + + @Override + public RequestType getType() { + return RequestType.AggregateTasklets; + } + + /** + * @return the AggregateFunctionID of the aggregate function. + */ + public int getAggregateFunctionId() { + return aggregateFunctionId; + } + + /** + * @return the aggregate function as specified by the user. + */ + public VortexAggregateFunction getAggregateFunction() { + return userAggregateFunction; + } + + /** + * @return the user specified function. + */ + public VortexFunction getFunction() { + return function; + } + + /** + * Execute the aggregate function using the list of outputs. + * @return Output of the function in a serialized form. + */ + public byte[] executeAggregation(final List<TOutput> outputs) throws Exception { + final TOutput output = userAggregateFunction.call(outputs); + final Codec<TOutput> codec = userAggregateFunction.getOutputCodec(); + + // TODO[REEF-1113]: Handle serialization failure separately in Vortex + return codec.encode(output); + } + + /** + * Execute the user specified function. + */ + public TOutput executeFunction(final TInput input) throws Exception { + return function.call(input); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java index 8f725a8..88f2d89 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java @@ -31,7 +31,9 @@ public final class TaskletCancellationRequest implements VortexRequest { this.taskletId = taskletId; } - @Override + /** + * @return the ID of the VortexTasklet associated with this VortexRequest. + */ public int getTaskletId() { return taskletId; } http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java index d85c69b..e850c9a 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java @@ -64,9 +64,8 @@ public final class TaskletExecutionRequest<TInput, TOutput> implements VortexReq } /** - * Get id of the tasklet. + * @return the ID of the VortexTasklet associated with this VortexRequest. */ - @Override public int getTaskletId() { return taskletId; } http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java index 2200af3..ce6b0dd 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java @@ -25,9 +25,11 @@ import org.apache.commons.lang.SerializationUtils; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; +import org.apache.reef.vortex.api.VortexAggregateFunction; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.common.avro.*; +import javax.inject.Inject; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -41,15 +43,56 @@ import java.util.List; @DriverSide @Unstable public final class VortexAvroUtils { + private final AggregateFunctionRepository aggregateFunctionRepository; + + @Inject + private VortexAvroUtils(final AggregateFunctionRepository aggregateFunctionRepository) { + this.aggregateFunctionRepository = aggregateFunctionRepository; + } + /** * Serialize VortexRequest to byte array. * @param vortexRequest Vortex request message to serialize. * @return Serialized byte array. */ - public static byte[] toBytes(final VortexRequest vortexRequest) { + public byte[] toBytes(final VortexRequest vortexRequest) { // Convert VortexRequest message to Avro message. final AvroVortexRequest avroVortexRequest; switch (vortexRequest.getType()) { + case ExecuteAggregateTasklet: + final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest = + (TaskletAggregateExecutionRequest) vortexRequest; + // TODO[REEF-1113]: Handle serialization failure separately in Vortex + final byte[] serializedInputForAggregate = + aggregateFunctionRepository.getFunction(taskletAggregateExecutionRequest.getAggregateFunctionId()) + .getInputCodec().encode(taskletAggregateExecutionRequest.getInput()); + avroVortexRequest = AvroVortexRequest.newBuilder() + .setRequestType(AvroRequestType.AggregateExecute) + .setTaskletRequest( + AvroTaskletAggregateExecutionRequest.newBuilder() + .setAggregateFunctionId(taskletAggregateExecutionRequest.getAggregateFunctionId()) + .setSerializedInput(ByteBuffer.wrap(serializedInputForAggregate)) + .setTaskletId(taskletAggregateExecutionRequest.getTaskletId()) + .build()) + .build(); + break; + case AggregateTasklets: + final TaskletAggregationRequest taskletAggregationRequest = (TaskletAggregationRequest) vortexRequest; + + // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction + final byte[] serializedAggregateFunction = SerializationUtils.serialize( + taskletAggregationRequest.getAggregateFunction()); + final byte[] serializedFunctionForAggregation = SerializationUtils.serialize( + taskletAggregationRequest.getFunction()); + avroVortexRequest = AvroVortexRequest.newBuilder() + .setRequestType(AvroRequestType.Aggregate) + .setTaskletRequest(AvroTaskletAggregationRequest.newBuilder() + .setAggregateFunctionId(taskletAggregationRequest.getAggregateFunctionId()) + .setSerializedAggregateFunction(ByteBuffer.wrap(serializedAggregateFunction)) + .setSerializedUserFunction(ByteBuffer.wrap(serializedFunctionForAggregation)) + .build()) + .build(); + break; case ExecuteTasklet: final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; // The following TODOs are sub-issues of cleaning up Serializable in Vortex (REEF-504). @@ -93,7 +136,7 @@ public final class VortexAvroUtils { * @param workerReport Worker report message to serialize. * @return Serialized byte array. */ - public static byte[] toBytes(final WorkerReport workerReport) { + public byte[] toBytes(final WorkerReport workerReport) { final List<AvroTaskletReport> workerTaskletReports = new ArrayList<>(); for (final TaskletReport taskletReport : workerReport.getTaskletReports()) { @@ -179,11 +222,31 @@ public final class VortexAvroUtils { * @param bytes Byte array to deserialize. * @return De-serialized VortexRequest. */ - public static VortexRequest toVortexRequest(final byte[] bytes) { + public VortexRequest toVortexRequest(final byte[] bytes) { final AvroVortexRequest avroVortexRequest = toAvroObject(bytes, AvroVortexRequest.class); final VortexRequest vortexRequest; switch (avroVortexRequest.getRequestType()) { + case AggregateExecute: + final AvroTaskletAggregateExecutionRequest taskletAggregateExecutionRequest = + (AvroTaskletAggregateExecutionRequest)avroVortexRequest.getTaskletRequest(); + vortexRequest = new TaskletAggregateExecutionRequest<>(taskletAggregateExecutionRequest.getTaskletId(), + taskletAggregateExecutionRequest.getAggregateFunctionId(), + aggregateFunctionRepository.getFunction(taskletAggregateExecutionRequest.getAggregateFunctionId()) + .getInputCodec().decode(taskletAggregateExecutionRequest.getSerializedInput().array())); + break; + case Aggregate: + final AvroTaskletAggregationRequest taskletAggregationRequest = + (AvroTaskletAggregationRequest)avroVortexRequest.getTaskletRequest(); + final VortexAggregateFunction aggregateFunction = + (VortexAggregateFunction) SerializationUtils.deserialize( + taskletAggregationRequest.getSerializedAggregateFunction().array()); + final VortexFunction functionForAggregation = + (VortexFunction) SerializationUtils.deserialize( + taskletAggregationRequest.getSerializedUserFunction().array()); + vortexRequest = new TaskletAggregationRequest<>(taskletAggregationRequest.getAggregateFunctionId(), + aggregateFunction, functionForAggregation); + break; case ExecuteTasklet: final AvroTaskletExecutionRequest taskletExecutionRequest = (AvroTaskletExecutionRequest)avroVortexRequest.getTaskletRequest(); @@ -211,7 +274,7 @@ public final class VortexAvroUtils { * @param bytes Byte array to deserialize. * @return De-serialized WorkerReport. */ - public static WorkerReport toWorkerReport(final byte[] bytes) { + public WorkerReport toWorkerReport(final byte[] bytes) { final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class); final List<TaskletReport> workerTaskletReports = new ArrayList<>(); @@ -221,32 +284,32 @@ public final class VortexAvroUtils { switch (avroTaskletReport.getReportType()) { case TaskletResult: final AvroTaskletResultReport taskletResultReport = - (AvroTaskletResultReport)avroTaskletReport.getTaskletReport(); + (AvroTaskletResultReport) avroTaskletReport.getTaskletReport(); taskletReport = new TaskletResultReport(taskletResultReport.getTaskletId(), taskletResultReport.getSerializedOutput().array()); break; case TaskletAggregationResult: final AvroTaskletAggregationResultReport taskletAggregationResultReport = - (AvroTaskletAggregationResultReport)avroTaskletReport.getTaskletReport(); + (AvroTaskletAggregationResultReport) avroTaskletReport.getTaskletReport(); taskletReport = new TaskletAggregationResultReport(taskletAggregationResultReport.getTaskletIds(), taskletAggregationResultReport.getSerializedOutput().array()); break; case TaskletCancelled: final AvroTaskletCancelledReport taskletCancelledReport = - (AvroTaskletCancelledReport)avroTaskletReport.getTaskletReport(); + (AvroTaskletCancelledReport) avroTaskletReport.getTaskletReport(); taskletReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId()); break; case TaskletFailure: final AvroTaskletFailureReport taskletFailureReport = - (AvroTaskletFailureReport)avroTaskletReport.getTaskletReport(); + (AvroTaskletFailureReport) avroTaskletReport.getTaskletReport(); final Exception exception = (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array()); taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception); break; case TaskletAggregationFailure: final AvroTaskletAggregationFailureReport taskletAggregationFailureReport = - (AvroTaskletAggregationFailureReport)avroTaskletReport.getTaskletReport(); + (AvroTaskletAggregationFailureReport) avroTaskletReport.getTaskletReport(); final Exception aggregationException = (Exception) SerializationUtils.deserialize( taskletAggregationFailureReport.getSerializedException().array()); @@ -270,7 +333,7 @@ public final class VortexAvroUtils { * @param <T> Type of the Avro object. * @return Serialized byte array. */ - private static <T> byte[] toBytes(final T avroObject, final Class<T> theClass) { + private <T> byte[] toBytes(final T avroObject, final Class<T> theClass) { final DatumWriter<T> reportWriter = new SpecificDatumWriter<>(theClass); final byte[] theBytes; try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { @@ -292,7 +355,7 @@ public final class VortexAvroUtils { * @param <T> Type of the Avro object. * @return Avro object de-serialized from byte array. */ - private static <T> T toAvroObject(final byte[] bytes, final Class<T> theClass) { + private <T> T toAvroObject(final byte[] bytes, final Class<T> theClass) { final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); final SpecificDatumReader<T> reader = new SpecificDatumReader<>(theClass); try { @@ -301,10 +364,4 @@ public final class VortexAvroUtils { throw new RuntimeException(e); } } - - /** - * Empty private constructor to prohibit instantiation of utility class. - */ - private VortexAvroUtils() { - } } http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java index 133b007..18f44e0 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java @@ -29,16 +29,13 @@ public interface VortexRequest { * Type of Request. */ enum RequestType { + AggregateTasklets, ExecuteTasklet, - CancelTasklet + CancelTasklet, + ExecuteAggregateTasklet } /** - * @return the ID of the VortexTasklet associated with this VortexRequest. - */ - int getTaskletId(); - - /** * @return the type of this VortexRequest. */ RequestType getType(); http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java deleted file mode 100644 index 9b1da7a..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.driver; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.vortex.api.VortexAggregateFunction; - -import javax.annotation.concurrent.ThreadSafe; -import javax.inject.Inject; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * A repository for {@link VortexAggregateFunction}, used to pass functions between {@link VortexMaster} and - * {@link RunningWorkers}. - */ -@ThreadSafe -@Unstable -@Private -public final class AggregateFunctionRepository { - private final ConcurrentMap<Integer, VortexAggregateFunction> aggregateFunctionMap = new ConcurrentHashMap<>(); - - @Inject - private AggregateFunctionRepository() { - } - - VortexAggregateFunction put(final int aggregateFunctionId, final VortexAggregateFunction function) { - return aggregateFunctionMap.put(aggregateFunctionId, function); - } - - VortexAggregateFunction get(final int aggregateFunctionId) { - return aggregateFunctionMap.get(aggregateFunctionId); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java index 55aeb5a..0ce2117 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java @@ -94,7 +94,7 @@ final class DefaultVortexMaster implements VortexMaster { final VortexFunction<TInput, TOutput> vortexFunction, final List<TInput> inputs, final Optional<FutureCallback<AggregateResult<TInput, TOutput>>> callback) { final int aggregateFunctionId = aggregateIdCounter.getAndIncrement(); - aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction); + aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction, vortexFunction); final Codec<TOutput> aggOutputCodec = aggregateFunction.getOutputCodec(); final List<Tasklet> tasklets = new ArrayList<>(inputs.size()); final Map<Integer, TInput> taskletIdInputMap = new HashMap<>(inputs.size()); http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java index 2bddc18..226ee34 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java @@ -20,9 +20,9 @@ package org.apache.reef.vortex.driver; import net.jcip.annotations.ThreadSafe; -import org.apache.commons.lang3.NotImplementedException; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.util.Optional; +import org.apache.reef.vortex.common.AggregateFunctionRepository; import javax.inject.Inject; @@ -159,11 +159,14 @@ final class RunningWorkers { final Optional<Integer> taskletAggFunctionId = tasklet.getAggregateFunctionId(); final VortexWorkerManager vortexWorkerManager = runningWorkers.get(workerId.get()); + if (taskletAggFunctionId.isPresent() && !workerHasAggregateFunction(vortexWorkerManager.getId(), taskletAggFunctionId.get())) { - // TODO[JIRA REEF-1130]: fetch aggregate function from repo and send aggregate function to worker. - throw new NotImplementedException("Serialize aggregate function to worker if it doesn't have it. " + - "Complete in REEF-1130."); + + // This assumes that all aggregate tasklets share the same user function. + vortexWorkerManager.sendAggregateFunction(taskletAggFunctionId.get(), + aggregateFunctionRepository.getAggregateFunction(taskletAggFunctionId.get()), tasklet.getUserFunction()); + workerAggregateFunctionMap.get(vortexWorkerManager.getId()).add(taskletAggFunctionId.get()); } vortexWorkerManager.launchTasklet(tasklet); http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java index f581f99..ca58174 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java @@ -64,6 +64,7 @@ final class VortexDriver { private final EStage<VortexStart> vortexStartEStage; private final VortexStart vortexStart; private final EStage<Integer> pendingTaskletSchedulerEStage; + private final VortexAvroUtils vortexAvroUtils; @Inject private VortexDriver(final EvaluatorRequestor evaluatorRequestor, @@ -72,6 +73,7 @@ final class VortexDriver { final VortexStart vortexStart, final VortexStartExecutor vortexStartExecutor, final PendingTaskletLauncher pendingTaskletLauncher, + final VortexAvroUtils vortexAvroUtils, @Parameter(VortexMasterConf.WorkerMem.class) final int workerMem, @Parameter(VortexMasterConf.WorkerNum.class) final int workerNum, @Parameter(VortexMasterConf.WorkerCores.class) final int workerCores, @@ -79,6 +81,7 @@ final class VortexDriver { this.vortexStartEStage = new ThreadPoolStage<>(vortexStartExecutor, numOfStartThreads); this.vortexStart = vortexStart; this.pendingTaskletSchedulerEStage = new SingleThreadStage<>(pendingTaskletLauncher, 1); + this.vortexAvroUtils = vortexAvroUtils; this.evaluatorRequestor = evaluatorRequestor; this.vortexMaster = vortexMaster; this.vortexRequestor = vortexRequestor; @@ -151,7 +154,7 @@ final class VortexDriver { @Override public void onNext(final TaskMessage taskMessage) { final String workerId = taskMessage.getId(); - final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get()); + final WorkerReport workerReport = vortexAvroUtils.toWorkerReport(taskMessage.get()); vortexMaster.workerReported(workerId, workerReport); } } http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java index b94b5b0..4aabf32 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java @@ -33,18 +33,30 @@ import java.util.concurrent.Executors; @DriverSide class VortexRequestor { private final ExecutorService executorService = Executors.newCachedThreadPool(); + private final VortexAvroUtils vortexAvroUtils; @Inject - VortexRequestor() { + VortexRequestor(final VortexAvroUtils vortexAvroUtils) { + this.vortexAvroUtils = vortexAvroUtils; } - void send(final RunningTask reefTask, final VortexRequest vortexRequest) { + /** + * Sends a {@link VortexRequest} asynchronously to a {@link org.apache.reef.vortex.evaluator.VortexWorker}. + */ + void sendAsync(final RunningTask reefTask, final VortexRequest vortexRequest) { executorService.execute(new Runnable() { @Override public void run() { // Possible race condition with VortexWorkerManager#terminate is addressed by the global lock in VortexMaster - reefTask.send(VortexAvroUtils.toBytes(vortexRequest)); + send(reefTask, vortexRequest); } }); } + + /** + * Sends a {@link VortexRequest} synchronously to a {@link org.apache.reef.vortex.evaluator.VortexWorker}. + */ + void send(final RunningTask reefTask, final VortexRequest vortexRequest) { + reefTask.send(vortexAvroUtils.toBytes(vortexRequest)); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java index 88911e3..478bf81 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java @@ -21,6 +21,10 @@ package org.apache.reef.vortex.driver; import net.jcip.annotations.NotThreadSafe; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.vortex.api.VortexAggregateFunction; +import org.apache.reef.vortex.api.VortexFunction; +import org.apache.reef.vortex.common.TaskletAggregateExecutionRequest; +import org.apache.reef.vortex.common.TaskletAggregationRequest; import org.apache.reef.vortex.common.TaskletCancellationRequest; import org.apache.reef.vortex.common.TaskletExecutionRequest; @@ -41,17 +45,49 @@ class VortexWorkerManager { this.reefTask = reefTask; } + /** + * Sends an {@link VortexAggregateFunction} and its {@link VortexFunction} to a + * {@link org.apache.reef.vortex.evaluator.VortexWorker}. + */ + <TInput, TOutput> void sendAggregateFunction(final int aggregateFunctionId, + final VortexAggregateFunction<TOutput> aggregateFunction, + final VortexFunction<TInput, TOutput> function) { + final TaskletAggregationRequest<TInput, TOutput> taskletAggregationRequest = + new TaskletAggregationRequest<>(aggregateFunctionId, aggregateFunction, function); + + // The send is synchronous such that we make sure that the aggregate function is sent to the + // target worker before attempting to launch an aggregateable tasklet on it. + vortexRequestor.send(reefTask, taskletAggregationRequest); + } + + + /** + * Sends a request to launch a Tasklet on a {@link org.apache.reef.vortex.evaluator.VortexWorker}. + */ <TInput, TOutput> void launchTasklet(final Tasklet<TInput, TOutput> tasklet) { assert !runningTasklets.containsKey(tasklet.getId()); runningTasklets.put(tasklet.getId(), tasklet); - final TaskletExecutionRequest<TInput, TOutput> taskletExecutionRequest - = new TaskletExecutionRequest<>(tasklet.getId(), tasklet.getUserFunction(), tasklet.getInput()); - vortexRequestor.send(reefTask, taskletExecutionRequest); + + if (tasklet.getAggregateFunctionId().isPresent()) { + // function is aggregateable. + final TaskletAggregateExecutionRequest<TInput> taskletAggregateExecutionRequest = + new TaskletAggregateExecutionRequest<>(tasklet.getId(), tasklet.getAggregateFunctionId().get(), + tasklet.getInput()); + vortexRequestor.sendAsync(reefTask, taskletAggregateExecutionRequest); + } else { + // function is not aggregateable. + final TaskletExecutionRequest<TInput, TOutput> taskletExecutionRequest + = new TaskletExecutionRequest<>(tasklet.getId(), tasklet.getUserFunction(), tasklet.getInput()); + vortexRequestor.sendAsync(reefTask, taskletExecutionRequest); + } } + /** + * Sends a request to cancel a Tasklet on a {@link org.apache.reef.vortex.evaluator.VortexWorker}. + */ void cancelTasklet(final int taskletId) { final TaskletCancellationRequest cancellationRequest = new TaskletCancellationRequest(taskletId); - vortexRequestor.send(reefTask, cancellationRequest); + vortexRequestor.sendAsync(reefTask, cancellationRequest); } List<Tasklet> taskletsDone(final List<Integer> taskletIds) { http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java new file mode 100644 index 0000000..5687e0b --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.evaluator; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.util.Optional; +import org.apache.reef.vortex.common.*; + +import javax.annotation.concurrent.GuardedBy; +import java.util.ArrayList; +import java.util.List; + +/** + * A container for tasklet aggregation, used to preserve output from individual + * {@link org.apache.reef.vortex.api.VortexFunction}s and to trigger + * {@link org.apache.reef.vortex.api.VortexAggregateFunction}s on the pooled outputs. + */ +@Private +@DriverSide +@Unstable +final class AggregateContainer { + + private final Object stateLock = new Object(); + private final TaskletAggregationRequest taskletAggregationRequest; + + @GuardedBy("stateLock") + private final List<Pair<Integer, Object>> completedTasklets = new ArrayList<>(); + + @GuardedBy("stateLock") + private final List<Pair<Integer, Exception>> failedTasklets = new ArrayList<>(); + + AggregateContainer(final TaskletAggregationRequest taskletAggregationRequest) { + this.taskletAggregationRequest = taskletAggregationRequest; + } + + public TaskletAggregationRequest getTaskletAggregationRequest() { + return taskletAggregationRequest; + } + + /** + * Performs the output aggregation and generates the {@link WorkerReport} to report back to the + * {@link org.apache.reef.vortex.driver.VortexDriver}. + */ + public Optional<WorkerReport> aggregateTasklets() { + final List<TaskletReport> taskletReports = new ArrayList<>(); + final List<Object> results = new ArrayList<>(); + final List<Integer> aggregatedTasklets = new ArrayList<>(); + + // Synchronization to prevent duplication of work on the same aggregation function on the same worker. + synchronized (stateLock) { + // Add the successful tasklets for aggregation. + for (final Pair<Integer, Object> resultPair : completedTasklets) { + aggregatedTasklets.add(resultPair.getLeft()); + results.add(resultPair.getRight()); + } + + // Add failed tasklets to worker report. + for (final Pair<Integer, Exception> failedPair : failedTasklets) { + taskletReports.add(new TaskletFailureReport(failedPair.getLeft(), failedPair.getRight())); + } + + // Drain the tasklets. + completedTasklets.clear(); + failedTasklets.clear(); + } + + if (!results.isEmpty()) { + // Run the aggregation function. + try { + final byte[] aggregationResult = taskletAggregationRequest.executeAggregation(results); + taskletReports.add(new TaskletAggregationResultReport(aggregatedTasklets, aggregationResult)); + } catch (final Exception e) { + taskletReports.add(new TaskletAggregationFailureReport(aggregatedTasklets, e)); + } + } + + return taskletReports.isEmpty() ? Optional.<WorkerReport>empty() : Optional.of(new WorkerReport(taskletReports)); + } + + /** + * Reported when an associated tasklet is complete and adds it to the completion pool. + */ + public void taskletComplete(final int taskletId, final Object result) { + synchronized (stateLock) { + completedTasklets.add(new ImmutablePair<>(taskletId, result)); + } + } + + /** + * Reported when an associated tasklet is complete and adds it to the failure pool. + */ + public void taskletFailed(final int taskletId, final Exception e) { + synchronized (stateLock) { + failedTasklets.add(new ImmutablePair<>(taskletId, e)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java index 3390c22..897d6e9 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java @@ -30,6 +30,7 @@ import org.apache.reef.task.events.CloseEvent; import org.apache.reef.task.events.DriverMessage; import org.apache.reef.util.Optional; import org.apache.reef.vortex.common.*; +import org.apache.reef.vortex.common.AggregateFunctionRepository; import org.apache.reef.vortex.driver.VortexWorkerConf; import org.apache.reef.wake.EventHandler; @@ -53,15 +54,22 @@ public final class VortexWorker implements Task, TaskMessageSource { private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque<>(); private final BlockingDeque<byte[]> workerReports = new LinkedBlockingDeque<>(); + private final ConcurrentMap<Integer, AggregateContainer> aggregates = new ConcurrentHashMap<>(); + private final AggregateFunctionRepository aggregateFunctionRepository; + private final VortexAvroUtils vortexAvroUtils; private final HeartBeatTriggerManager heartBeatTriggerManager; private final int numOfThreads; private final CountDownLatch terminated = new CountDownLatch(1); @Inject private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager, + final AggregateFunctionRepository aggregateFunctionRepository, + final VortexAvroUtils vortexAvroUtils, @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) { this.heartBeatTriggerManager = heartBeatTriggerManager; + this.aggregateFunctionRepository = aggregateFunctionRepository; + this.vortexAvroUtils = vortexAvroUtils; this.numOfThreads = numOfThreads; } @@ -76,6 +84,7 @@ public final class VortexWorker implements Task, TaskMessageSource { // Scheduling thread starts schedulerThread.execute(new Runnable() { + @SuppressWarnings("InfiniteLoopStatement") // Scheduler is supposed to run forever. @Override public void run() { while (true) { @@ -88,63 +97,31 @@ public final class VortexWorker implements Task, TaskMessageSource { } // Command Executor: Deserialize the command - final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message); + final VortexRequest vortexRequest = vortexAvroUtils.toVortexRequest(message); switch (vortexRequest.getType()) { + case AggregateTasklets: + final TaskletAggregationRequest taskletAggregationRequest = (TaskletAggregationRequest) vortexRequest; + aggregates.put(taskletAggregationRequest.getAggregateFunctionId(), + new AggregateContainer(taskletAggregationRequest)); + + // VortexFunctions need to be put into the repository such that VortexAvroUtils will know how to + // convert inputs and functions into a VortexRequest on subsequent messages requesting to + // execute the aggregateable tasklets. + aggregateFunctionRepository.put(taskletAggregationRequest.getAggregateFunctionId(), + taskletAggregationRequest.getAggregateFunction(), taskletAggregationRequest.getFunction()); + + break; + case ExecuteAggregateTasklet: + executeAggregateTasklet(commandExecutor, vortexRequest); + break; case ExecuteTasklet: - final CountDownLatch latch = new CountDownLatch(1); - - // Scheduler Thread: Pass the command to the worker thread pool to be executed - // Record future to support cancellation. - futures.put( - vortexRequest.getTaskletId(), - commandExecutor.submit(new Runnable() { - @Override - public void run() { - final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; - final WorkerReport workerReport; - final List<TaskletReport> taskletReports = new ArrayList<>(); - - try { - // Command Executor: Execute the command - final TaskletReport taskletReport = - new TaskletResultReport(taskletExecutionRequest.getTaskletId(), - taskletExecutionRequest.execute()); - taskletReports.add(taskletReport); - } catch (final InterruptedException ex) { - // Assumes that user's thread follows convention that cancelled Futures - // should throw InterruptedException. - final TaskletReport taskletReport = - new TaskletCancelledReport(taskletExecutionRequest.getTaskletId()); - LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", vortexRequest.getTaskletId()); - taskletReports.add(taskletReport); - } catch (Exception e) { - // Command Executor: Tasklet throws an exception - final TaskletReport taskletReport = - new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); - taskletReports.add(taskletReport); - } - - workerReport = new WorkerReport(taskletReports); - workerReports.addLast(VortexAvroUtils.toBytes(workerReport)); - try { - latch.await(); - } catch (final InterruptedException e) { - LOG.log(Level.SEVERE, "Cannot wait for Future to be put."); - throw new RuntimeException(e); - } - - futures.remove(vortexRequest.getTaskletId()); - heartBeatTriggerManager.triggerHeartBeat(); - } - })); - - // Signal that future is put. - latch.countDown(); + executeTasklet(commandExecutor, futures, vortexRequest); break; case CancelTasklet: - LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", vortexRequest.getTaskletId()); - final Future future = futures.get(vortexRequest.getTaskletId()); + final TaskletCancellationRequest cancellationRequest = (TaskletCancellationRequest) vortexRequest; + LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", cancellationRequest.getTaskletId()); + final Future future = futures.get(cancellationRequest.getTaskletId()); if (future != null) { future.cancel(true); } @@ -161,6 +138,100 @@ public final class VortexWorker implements Task, TaskMessageSource { } /** + * Executes an tasklet request from the {@link org.apache.reef.vortex.driver.VortexDriver}. + */ + private void executeTasklet(final ExecutorService commandExecutor, + final ConcurrentMap<Integer, Future> futures, + final VortexRequest vortexRequest) { + final CountDownLatch latch = new CountDownLatch(1); + final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; + + // Scheduler Thread: Pass the command to the worker thread pool to be executed + // Record future to support cancellation. + futures.put( + taskletExecutionRequest.getTaskletId(), + commandExecutor.submit(new Runnable() { + @Override + public void run() { + final WorkerReport workerReport; + final List<TaskletReport> taskletReports = new ArrayList<>(); + + try { + // Command Executor: Execute the command + final TaskletReport taskletReport = + new TaskletResultReport(taskletExecutionRequest.getTaskletId(), + taskletExecutionRequest.execute()); + taskletReports.add(taskletReport); + } catch (final InterruptedException ex) { + // Assumes that user's thread follows convention that cancelled Futures + // should throw InterruptedException. + final TaskletReport taskletReport = + new TaskletCancelledReport(taskletExecutionRequest.getTaskletId()); + LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", + taskletExecutionRequest.getTaskletId()); + taskletReports.add(taskletReport); + } catch (Exception e) { + // Command Executor: Tasklet throws an exception + final TaskletReport taskletReport = + new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); + taskletReports.add(taskletReport); + } + + workerReport = new WorkerReport(taskletReports); + workerReports.addLast(vortexAvroUtils.toBytes(workerReport)); + try { + latch.await(); + } catch (final InterruptedException e) { + LOG.log(Level.SEVERE, "Cannot wait for Future to be put."); + throw new RuntimeException(e); + } + + futures.remove(taskletExecutionRequest.getTaskletId()); + heartBeatTriggerManager.triggerHeartBeat(); + } + })); + + // Signal that future is put. + latch.countDown(); + } + + /** + * Executes an aggregation request from the {@link org.apache.reef.vortex.driver.VortexDriver}. + */ + private void executeAggregateTasklet(final ExecutorService commandExecutor, + final VortexRequest vortexRequest) { + final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest = + (TaskletAggregateExecutionRequest) vortexRequest; + + assert aggregates.containsKey(taskletAggregateExecutionRequest.getAggregateFunctionId()); + + final AggregateContainer aggregateContainer = aggregates.get( + taskletAggregateExecutionRequest.getAggregateFunctionId()); + final TaskletAggregationRequest aggregationRequest = aggregateContainer.getTaskletAggregationRequest(); + + commandExecutor.submit(new Runnable() { + @Override + public void run() { + try { + final Object result = aggregationRequest.executeFunction(taskletAggregateExecutionRequest.getInput()); + aggregateContainer.taskletComplete(taskletAggregateExecutionRequest.getTaskletId(), result); + } catch (final Exception e) { + aggregateContainer.taskletFailed(taskletAggregateExecutionRequest.getTaskletId(), e); + } + + // TODO[JIRA REEF-1131]: Call according to aggregate policies. + final Optional<WorkerReport> workerReport = aggregateContainer.aggregateTasklets(); + + // Add to worker report only if there is something to report back. + if (workerReport.isPresent()) { + workerReports.addLast(vortexAvroUtils.toBytes(workerReport.get())); + heartBeatTriggerManager.triggerHeartBeat(); + } + } + }); + } + + /** * @return the workerReport the worker wishes to send. */ @Override http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java new file mode 100644 index 0000000..391de6e --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.examples.sumones; + +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.io.serialization.SerializableCodec; +import org.apache.reef.vortex.api.VortexAggregateException; +import org.apache.reef.vortex.api.VortexAggregateFunction; + +import java.util.List; + +/** + * Aggregates and sums the outputs. + */ +public final class AdditionAggregateFunction implements VortexAggregateFunction<Integer> { + private static final Codec<Integer> CODEC = new SerializableCodec<>(); + + @Override + public Integer call(final List<Integer> taskletOutputs) throws VortexAggregateException { + int sum = 0; + for (final int output : taskletOutputs) { + sum += output; + } + + return sum; + } + + @Override + public Codec<Integer> getOutputCodec() { + return CODEC; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java new file mode 100644 index 0000000..b7a69ef --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.examples.sumones; + +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.io.serialization.SerializableCodec; +import org.apache.reef.vortex.api.VortexFunction; + +/** + * Identity function. + */ +public final class IdentityFunction implements VortexFunction<Integer, Integer> { + private static final Codec<Integer> CODEC = new SerializableCodec<>(); + + /** + * Outputs input. + */ + @Override + public Integer call(final Integer input) throws Exception { + return input; + } + + @Override + public Codec<Integer> getInputCodec() { + return CODEC; + } + + @Override + public Codec<Integer> getOutputCodec() { + return CODEC; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnes.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnes.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnes.java new file mode 100644 index 0000000..7ba8b10 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnes.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.examples.sumones; + +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.vortex.driver.VortexJobConf; +import org.apache.reef.vortex.driver.VortexLauncher; +import org.apache.reef.vortex.driver.VortexMasterConf; + +/** + * User's main function. + */ +final class SumOnes { + private SumOnes() { + } + + /** + * Launch the vortex job, passing appropriate arguments. + */ + public static void main(final String[] args) { + final Configuration vortexMasterConf = VortexMasterConf.CONF + .set(VortexMasterConf.WORKER_NUM, 2) + .set(VortexMasterConf.WORKER_MEM, 1024) + .set(VortexMasterConf.WORKER_CORES, 4) + .set(VortexMasterConf.WORKER_CAPACITY, 2000) + .set(VortexMasterConf.VORTEX_START, SumOnesAggregateStart.class) + .build(); + + final Configuration userConf = Tang.Factory.getTang().newConfigurationBuilder() + .bindNamedParameter(NumberOfOnes.class, "1000") + .build(); + + final VortexJobConf vortexJobConf = VortexJobConf.newBuilder() + .setJobName("Vortex_Example_SumOnes") + .setVortexMasterConf(vortexMasterConf) + .setUserConf(userConf) + .build(); + + VortexLauncher.launchLocal(vortexJobConf); + } + + @NamedParameter(doc = "numbers of ones to sum") + public static class NumberOfOnes implements Name<Integer> { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java new file mode 100644 index 0000000..bd09565 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.examples.sumones; + +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.vortex.api.*; + +import javax.inject.Inject; +import java.util.Vector; + +/** + * SumOnes User Code Example. + */ +final class SumOnesAggregateStart implements VortexStart { + + private final int numbers; + + @Inject + private SumOnesAggregateStart(@Parameter(SumOnes.NumberOfOnes.class) final int numbers) { + this.numbers = numbers; + } + + /** + * Perform a simple sum and aggregation of ones on Vortex. + */ + @Override + public void start(final VortexThreadPool vortexThreadPool) { + final Vector<Integer> inputVector = new Vector<>(); + for (int i = 0; i < numbers; i++) { + inputVector.add(1); + } + + final VortexAggregateFuture<Integer, Integer> future = + vortexThreadPool.submit(new AdditionAggregateFunction(), new IdentityFunction(), inputVector); + + try { + AggregateResultSynchronous<Integer, Integer> result; + result = future.get(); + int allSum = 0; + while (result.hasNext()) { + result = future.get(); + final int sumResult; + + try { + sumResult = result.getAggregateResult(); + } catch (final VortexAggregateException e) { + throw new RuntimeException(e); + } + + int sumInputs = 0; + for (int i : result.getAggregatedInputs()) { + sumInputs += i; + } + + assert sumResult == sumInputs; + + allSum += sumResult; + } + + assert allSum == numbers; + + } catch (final InterruptedException ie) { + throw new RuntimeException(ie); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/a1f62251/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/package-info.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/package-info.java new file mode 100644 index 0000000..001e5e9 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * A Simple Vortex addition and aggregation example. + */ +package org.apache.reef.vortex.examples.sumones; \ No newline at end of file
