Repository: reef Updated Branches: refs/heads/master 7a890d88a -> 7ec6de9c2
[REEF-1132] Define VortexAggregateFuture and VortexAggregateFunction This addressed the issue by * Defines and outlines the basic functionalities of VortexAggregateFuture and VortexAggregateFunction. * Currently not used in other parts of the codebase. Will be tested as Driver and Worker code is completed, as marked by TODO comments. JIRA: [REEF-1132](https://issues.apache.org/jira/browse/REEF-1132) Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/7ec6de9c Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/7ec6de9c Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/7ec6de9c Branch: refs/heads/master Commit: 7ec6de9c229042e70a3908954db01b93b1c82c99 Parents: 7a890d8 Author: Andrew Chung <[email protected]> Authored: Wed Jan 13 16:33:40 2016 -0800 Committer: Yunseong Lee <[email protected]> Committed: Tue Jan 19 13:14:06 2016 +0800 ---------------------------------------------------------------------- .../apache/reef/vortex/api/AggregateResult.java | 100 +++++++++ .../vortex/api/VortexAggregateException.java | 58 +++++ .../vortex/api/VortexAggregateFunction.java | 58 +++++ .../reef/vortex/api/VortexAggregateFuture.java | 224 +++++++++++++++++++ 4 files changed, 440 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/7ec6de9c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java new file mode 100644 index 0000000..22fb93f --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.util.Optional; + +import java.util.Collections; +import java.util.List; + +/** + * The result of an aggregate. + */ +@Public +@ClientSide +@Unstable +public final class AggregateResult<TInput, TOutput> { + + private final Optional<TOutput> aggregatedOutput; + private final List<TInput> inputList; + private final boolean hasNext; + private final Optional<Exception> exception; + + AggregateResult(final Exception exception, + final List<TInput> inputList, + final boolean hasNext) { + this(Optional.<TOutput>empty(), Optional.of(exception), inputList, hasNext); + } + + AggregateResult(final TOutput aggregatedOutput, + final List<TInput> inputList, + final boolean hasNext) { + this(Optional.of(aggregatedOutput), Optional.<Exception>empty(), inputList, hasNext); + } + + private AggregateResult(final Optional<TOutput> aggregatedOutput, + final Optional<Exception> exception, + final List<TInput> inputList, + final boolean hasNext) { + this.aggregatedOutput = aggregatedOutput; + this.inputList = Collections.unmodifiableList(inputList); + this.hasNext = hasNext; + this.exception = exception; + } + + /** + * @return the output of an aggregation, throws the Exception if a Tasklet or an aggregation fails. + * If an aggregation fails, {@link VortexAggregateException} will be thrown, otherwise + * the Exception that caused the Tasklet to fail will be thrown directly. + * @throws Exception the Exception that caused the Tasklet or aggregation failure. + */ + public TOutput getAggregateResult() throws VortexAggregateException { + if (exception.isPresent()) { + throw new VortexAggregateException(exception.get(), inputList); + } + + return aggregatedOutput.get(); + } + + /** + * @return the associated inputs of an aggregation + */ + public List<TInput> getAggregatedInputs() { + return inputList; + } + + /** + * If an aggregation fails, {@link VortexAggregateException} will be thrown, otherwise + * the Exception that caused the Tasklet to fail will be thrown directly. + * @return the Exception that caused the Tasklet or aggregation failure, if any. + */ + public Optional<Exception> getException() { + return exception; + } + + /** + * @return true if more results will be available, false otherwise. + */ + public boolean hasNext() { + return hasNext; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/7ec6de9c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateException.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateException.java new file mode 100644 index 0000000..2c13a56 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateException.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Exception thrown when an aggregate function fails. + * Call {@link Exception#getCause()} to find the cause of failure in aggregation. + * Call {@link VortexAggregateException#getInputs()} to get the inputs that correlate + * with the failure. + */ +@Unstable +public final class VortexAggregateException extends Exception { + private final List<Object> inputList; + + @Private + public VortexAggregateException(final Throwable cause, final List<?> inputList) { + super(cause); + this.inputList = new ArrayList<>(inputList); + } + + @Private + public VortexAggregateException(final String message, + final Throwable cause, + final List<?> inputList) { + super(message, cause); + this.inputList = new ArrayList<>(inputList); + } + + /** + * @return Inputs that correlate with the aggregation failure. + */ + public List<Object> getInputs() { + return Collections.unmodifiableList(inputList); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/7ec6de9c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java new file mode 100644 index 0000000..fe3b96a --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.wake.remote.Codec; + +import java.io.Serializable; +import java.util.List; + +/** + * Typed user function for Local Aggregation. Implement your functions using this interface. + * TODO[REEF-504]: Clean up Serializable in Vortex. + * TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction. + * + * @param <TOutput> output type of the aggregation function and the functions to-be-aggregated. + */ +@Public +@ClientSide +@Unstable +public interface VortexAggregateFunction<TOutput> extends Serializable { + + /** + * Runs a custom local aggregation function on Tasklets assigned to a VortexWorker. + * @param taskletOutputs the list of outputs from Tasklets on a Worker. + * @return the aggregated output of Tasklets. + * @throws Exception + */ + TOutput call(final List<TOutput> taskletOutputs) throws VortexAggregateException; + + /** + * Users must define codec for the AggregationOutput. + * {@link org.apache.reef.vortex.util.VoidCodec} can be used if the aggregation output is + * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can be used for ({@link Serializable} + * aggregation output. + * Custom aggregation output Codec can also be supplied. + * @return Codec used to serialize/deserialize the output. + */ + Codec<TOutput> getOutputCodec(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/7ec6de9c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java new file mode 100644 index 0000000..d958fac --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.vortex.common.VortexFutureDelegate; +import org.apache.reef.wake.remote.Codec; + +import javax.annotation.concurrent.NotThreadSafe; +import java.util.*; +import java.util.concurrent.*; +import java.util.logging.Logger; + +/** + * The interface between user code and aggregation Tasklets. + * Thread safety: This class is not meant to be used in a multi-threaded fashion. + * TODO[JIRA REEF-1131]: Create and run tests once functional. + */ +@Public +@ClientSide +@NotThreadSafe +@Unstable +public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutureDelegate { + private static final Logger LOG = Logger.getLogger(VortexAggregateFuture.class.getName()); + + private final Executor executor; + private final Codec<TOutput> aggOutputCodec; + private final BlockingQueue<AggregateResult> resultQueue; + private final Map<Integer, TInput> taskletIdInputMap; + private final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler; + + @Private + public VortexAggregateFuture(final Executor executor, + final Map<Integer, TInput> taskletIdInputMap, + final Codec<TOutput> aggOutputCodec, + final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler) { + this.executor = executor; + this.taskletIdInputMap = new HashMap<>(taskletIdInputMap); + this.resultQueue = new ArrayBlockingQueue<>(taskletIdInputMap.size()); + this.aggOutputCodec = aggOutputCodec; + this.callbackHandler = callbackHandler; + } + + /** + * @return the next aggregation result for the future, null if no more results. + */ + public synchronized AggregateResult get() throws InterruptedException { + if (taskletIdInputMap.isEmpty()) { + return null; + } + + return resultQueue.take(); + } + + /** + * @param timeout the timeout for the operation. + * @param timeUnit the time unit of the timeout. + * @return the next aggregation result for the future, within the user specified timeout, null if no more results. + * @throws TimeoutException if time out hits. + */ + public synchronized AggregateResult get(final long timeout, + final TimeUnit timeUnit) throws InterruptedException, TimeoutException { + if (taskletIdInputMap.isEmpty()) { + return null; + } + + final AggregateResult result = resultQueue.poll(timeout, timeUnit); + + if (result == null) { + throw new TimeoutException(); + } + + return result; + } + + /** + * @return true if there are no more results to poll. + */ + public synchronized boolean isDone() { + return taskletIdInputMap.size() == 0; + } + + /** + * A Tasklet associated with the aggregation has completed. + */ + @Private + @Override + public void completed(final int taskletId, final byte[] serializedResult) { + try { + // TODO[REEF-1113]: Handle serialization failure separately in Vortex + final TOutput result = aggOutputCodec.decode(serializedResult); + removeCompletedTasklets(result, Collections.singletonList(taskletId)); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * Aggregation has completed for a list of Tasklets, with an aggregated result. + */ + @Private + @Override + public void aggregationCompleted(final List<Integer> taskletIds, final byte[] serializedResult) { + try { + // TODO[REEF-1113]: Handle serialization failure separately in Vortex + final TOutput result = aggOutputCodec.decode(serializedResult); + removeCompletedTasklets(result, taskletIds); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * A Tasklet associated with the aggregation has failed. + */ + @Private + @Override + public void threwException(final int taskletId, final Exception exception) { + try { + removeFailedTasklets(exception, Collections.singletonList(taskletId)); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * A list of Tasklets has failed during aggregation phase. + */ + @Private + @Override + public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) { + try { + removeFailedTasklets(exception, taskletIds); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * Not implemented for local aggregation. + */ + @Private + @Override + public void cancelled(final int taskletId) { + throw new NotImplementedException("Tasklet cancellation not supported in aggregations."); + } + + /** + * Removes completed Tasklets from Tasklets that are expected and invoke callback. + */ + private synchronized void removeCompletedTasklets(final TOutput output, final List<Integer> taskletIds) + throws InterruptedException { + final AggregateResult result = + new AggregateResult(output, getInputs(taskletIds), taskletIdInputMap.size() > 0); + + if (callbackHandler != null) { + executor.execute(new Runnable() { + @Override + public void run() { + callbackHandler.onSuccess(result); + } + }); + } + + resultQueue.put(result); + } + + /** + * Removes failed Tasklets from Tasklets that are expected and invokes callback. + */ + private synchronized void removeFailedTasklets(final Exception exception, final List<Integer> taskletIds) + throws InterruptedException { + + final List<TInput> inputs = getInputs(taskletIds); + final AggregateResult failure = + new AggregateResult(exception, inputs, taskletIdInputMap.size() > 0); + + if (callbackHandler != null) { + executor.execute(new Runnable() { + @Override + public void run() { + // TODO[JIRA REEF-1129]: Add documentation in VortexThreadPool. + callbackHandler.onFailure(new VortexAggregateException(exception, inputs)); + } + }); + } + + resultQueue.put(failure); + } + + /** + * Gets the inputs on Tasklet aggregation completion. + */ + private synchronized List<TInput> getInputs(final List<Integer> taskletIds) { + + final List<TInput> inputList = new ArrayList<>(taskletIds.size()); + + for(final int taskletId : taskletIds) { + inputList.add(taskletIdInputMap.remove(taskletId)); + } + + return inputList; + } +} \ No newline at end of file
