Repository: reef
Updated Branches:
  refs/heads/master 7a890d88a -> 7ec6de9c2


[REEF-1132] Define VortexAggregateFuture and VortexAggregateFunction

This addressed the issue by
  * Defines and outlines the basic functionalities of VortexAggregateFuture and 
VortexAggregateFunction.
  * Currently not used in other parts of the codebase. Will be tested as Driver 
and Worker code is completed, as marked by TODO comments.

JIRA:
  [REEF-1132](https://issues.apache.org/jira/browse/REEF-1132)


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/7ec6de9c
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/7ec6de9c
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/7ec6de9c

Branch: refs/heads/master
Commit: 7ec6de9c229042e70a3908954db01b93b1c82c99
Parents: 7a890d8
Author: Andrew Chung <[email protected]>
Authored: Wed Jan 13 16:33:40 2016 -0800
Committer: Yunseong Lee <[email protected]>
Committed: Tue Jan 19 13:14:06 2016 +0800

----------------------------------------------------------------------
 .../apache/reef/vortex/api/AggregateResult.java | 100 +++++++++
 .../vortex/api/VortexAggregateException.java    |  58 +++++
 .../vortex/api/VortexAggregateFunction.java     |  58 +++++
 .../reef/vortex/api/VortexAggregateFuture.java  | 224 +++++++++++++++++++
 4 files changed, 440 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/7ec6de9c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java
new file mode 100644
index 0000000..22fb93f
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/AggregateResult.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.util.Optional;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The result of an aggregate.
+ */
+@Public
+@ClientSide
+@Unstable
+public final class AggregateResult<TInput, TOutput> {
+
+  private final Optional<TOutput> aggregatedOutput;
+  private final List<TInput> inputList;
+  private final boolean hasNext;
+  private final Optional<Exception> exception;
+
+  AggregateResult(final Exception exception,
+                  final List<TInput> inputList,
+                  final boolean hasNext) {
+    this(Optional.<TOutput>empty(), Optional.of(exception), inputList, 
hasNext);
+  }
+
+  AggregateResult(final TOutput aggregatedOutput,
+                  final List<TInput> inputList,
+                  final boolean hasNext) {
+    this(Optional.of(aggregatedOutput), Optional.<Exception>empty(), 
inputList, hasNext);
+  }
+
+  private AggregateResult(final Optional<TOutput> aggregatedOutput,
+                          final Optional<Exception> exception,
+                          final List<TInput> inputList,
+                          final boolean hasNext) {
+    this.aggregatedOutput = aggregatedOutput;
+    this.inputList = Collections.unmodifiableList(inputList);
+    this.hasNext = hasNext;
+    this.exception = exception;
+  }
+
+  /**
+   * @return the output of an aggregation, throws the Exception if a Tasklet 
or an aggregation fails.
+   * If an aggregation fails, {@link VortexAggregateException} will be thrown, 
otherwise
+   * the Exception that caused the Tasklet to fail will be thrown directly.
+   * @throws Exception the Exception that caused the Tasklet or aggregation 
failure.
+   */
+  public TOutput getAggregateResult() throws VortexAggregateException {
+    if (exception.isPresent()) {
+      throw new VortexAggregateException(exception.get(), inputList);
+    }
+
+    return aggregatedOutput.get();
+  }
+
+  /**
+   * @return the associated inputs of an aggregation
+   */
+  public List<TInput> getAggregatedInputs() {
+    return inputList;
+  }
+
+  /**
+   * If an aggregation fails, {@link VortexAggregateException} will be thrown, 
otherwise
+   * the Exception that caused the Tasklet to fail will be thrown directly.
+   * @return the Exception that caused the Tasklet or aggregation failure, if 
any.
+   */
+  public Optional<Exception> getException() {
+    return exception;
+  }
+
+  /**
+   * @return true if more results will be available, false otherwise.
+   */
+  public boolean hasNext() {
+    return hasNext;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/7ec6de9c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateException.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateException.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateException.java
new file mode 100644
index 0000000..2c13a56
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Exception thrown when an aggregate function fails.
+ * Call {@link Exception#getCause()} to find the cause of failure in 
aggregation.
+ * Call {@link VortexAggregateException#getInputs()} to get the inputs that 
correlate
+ * with the failure.
+ */
+@Unstable
+public final class VortexAggregateException extends Exception {
+  private final List<Object> inputList;
+
+  @Private
+  public VortexAggregateException(final Throwable cause, final List<?> 
inputList) {
+    super(cause);
+    this.inputList = new ArrayList<>(inputList);
+  }
+
+  @Private
+  public VortexAggregateException(final String message,
+                                  final Throwable cause,
+                                  final List<?> inputList) {
+    super(message, cause);
+    this.inputList = new ArrayList<>(inputList);
+  }
+
+  /**
+   * @return Inputs that correlate with the aggregation failure.
+   */
+  public List<Object> getInputs() {
+    return Collections.unmodifiableList(inputList);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/7ec6de9c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
new file mode 100644
index 0000000..fe3b96a
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.wake.remote.Codec;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Typed user function for Local Aggregation. Implement your functions using 
this interface.
+ * TODO[REEF-504]: Clean up Serializable in Vortex.
+ * TODO[REEF-1003]: Use reflection instead of serialization when launching 
VortexFunction.
+ *
+ * @param <TOutput> output type of the aggregation function and the functions 
to-be-aggregated.
+ */
+@Public
+@ClientSide
+@Unstable
+public interface VortexAggregateFunction<TOutput> extends Serializable {
+
+  /**
+   * Runs a custom local aggregation function on Tasklets assigned to a 
VortexWorker.
+   * @param taskletOutputs the list of outputs from Tasklets on a Worker.
+   * @return the aggregated output of Tasklets.
+   * @throws Exception
+   */
+  TOutput call(final List<TOutput> taskletOutputs) throws 
VortexAggregateException;
+
+  /**
+   * Users must define codec for the AggregationOutput.
+   * {@link org.apache.reef.vortex.util.VoidCodec} can be used if the 
aggregation output is
+   * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can 
be used for ({@link Serializable}
+   * aggregation output.
+   * Custom aggregation output Codec can also be supplied.
+   * @return Codec used to serialize/deserialize the output.
+   */
+  Codec<TOutput> getOutputCodec();
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/7ec6de9c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
new file mode 100644
index 0000000..d958fac
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.vortex.common.VortexFutureDelegate;
+import org.apache.reef.wake.remote.Codec;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.logging.Logger;
+
+/**
+ * The interface between user code and aggregation Tasklets.
+ * Thread safety: This class is not meant to be used in a multi-threaded 
fashion.
+ * TODO[JIRA REEF-1131]: Create and run tests once functional.
+ */
+@Public
+@ClientSide
+@NotThreadSafe
+@Unstable
+public final class VortexAggregateFuture<TInput, TOutput> implements 
VortexFutureDelegate {
+  private static final Logger LOG = 
Logger.getLogger(VortexAggregateFuture.class.getName());
+
+  private final Executor executor;
+  private final Codec<TOutput> aggOutputCodec;
+  private final BlockingQueue<AggregateResult> resultQueue;
+  private final Map<Integer, TInput> taskletIdInputMap;
+  private final FutureCallback<AggregateResult<TInput, TOutput>> 
callbackHandler;
+
+  @Private
+  public VortexAggregateFuture(final Executor executor,
+                               final Map<Integer, TInput> taskletIdInputMap,
+                               final Codec<TOutput> aggOutputCodec,
+                               final FutureCallback<AggregateResult<TInput, 
TOutput>> callbackHandler) {
+    this.executor = executor;
+    this.taskletIdInputMap = new HashMap<>(taskletIdInputMap);
+    this.resultQueue = new ArrayBlockingQueue<>(taskletIdInputMap.size());
+    this.aggOutputCodec = aggOutputCodec;
+    this.callbackHandler = callbackHandler;
+  }
+
+  /**
+   * @return the next aggregation result for the future, null if no more 
results.
+   */
+  public synchronized AggregateResult get() throws InterruptedException {
+    if (taskletIdInputMap.isEmpty()) {
+      return null;
+    }
+
+    return resultQueue.take();
+  }
+
+  /**
+   * @param timeout the timeout for the operation.
+   * @param timeUnit the time unit of the timeout.
+   * @return the next aggregation result for the future, within the user 
specified timeout, null if no more results.
+   * @throws TimeoutException if time out hits.
+   */
+  public synchronized AggregateResult get(final long timeout,
+                                          final TimeUnit timeUnit) throws 
InterruptedException, TimeoutException {
+    if (taskletIdInputMap.isEmpty()) {
+      return null;
+    }
+
+    final AggregateResult result = resultQueue.poll(timeout, timeUnit);
+
+    if (result == null) {
+      throw new TimeoutException();
+    }
+
+    return result;
+  }
+
+  /**
+   * @return true if there are no more results to poll.
+   */
+  public synchronized boolean isDone() {
+    return taskletIdInputMap.size() == 0;
+  }
+
+  /**
+   * A Tasklet associated with the aggregation has completed.
+   */
+  @Private
+  @Override
+  public void completed(final int taskletId, final byte[] serializedResult) {
+    try {
+      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
+      final TOutput result = aggOutputCodec.decode(serializedResult);
+      removeCompletedTasklets(result, Collections.singletonList(taskletId));
+    } catch (final InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Aggregation has completed for a list of Tasklets, with an aggregated 
result.
+   */
+  @Private
+  @Override
+  public void aggregationCompleted(final List<Integer> taskletIds, final 
byte[] serializedResult) {
+    try {
+      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
+      final TOutput result = aggOutputCodec.decode(serializedResult);
+      removeCompletedTasklets(result, taskletIds);
+    } catch (final InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * A Tasklet associated with the aggregation has failed.
+   */
+  @Private
+  @Override
+  public void threwException(final int taskletId, final Exception exception) {
+    try {
+      removeFailedTasklets(exception, Collections.singletonList(taskletId));
+    } catch (final InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * A list of Tasklets has failed during aggregation phase.
+   */
+  @Private
+  @Override
+  public void aggregationThrewException(final List<Integer> taskletIds, final 
Exception exception) {
+    try {
+      removeFailedTasklets(exception, taskletIds);
+    } catch (final InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Not implemented for local aggregation.
+   */
+  @Private
+  @Override
+  public void cancelled(final int taskletId) {
+    throw new NotImplementedException("Tasklet cancellation not supported in 
aggregations.");
+  }
+
+  /**
+   * Removes completed Tasklets from Tasklets that are expected and invoke 
callback.
+   */
+  private synchronized void removeCompletedTasklets(final TOutput output, 
final List<Integer> taskletIds)
+      throws InterruptedException {
+    final AggregateResult result =
+        new AggregateResult(output, getInputs(taskletIds), 
taskletIdInputMap.size() > 0);
+
+    if (callbackHandler != null) {
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          callbackHandler.onSuccess(result);
+        }
+      });
+    }
+
+    resultQueue.put(result);
+  }
+
+  /**
+   * Removes failed Tasklets from Tasklets that are expected and invokes 
callback.
+   */
+  private synchronized void removeFailedTasklets(final Exception exception, 
final List<Integer> taskletIds)
+      throws InterruptedException {
+
+    final List<TInput> inputs = getInputs(taskletIds);
+    final AggregateResult failure =
+        new AggregateResult(exception, inputs, taskletIdInputMap.size() > 0);
+
+    if (callbackHandler != null) {
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          // TODO[JIRA REEF-1129]: Add documentation in VortexThreadPool.
+          callbackHandler.onFailure(new VortexAggregateException(exception, 
inputs));
+        }
+      });
+    }
+
+    resultQueue.put(failure);
+  }
+
+  /**
+   * Gets the inputs on Tasklet aggregation completion.
+   */
+  private synchronized List<TInput> getInputs(final List<Integer> taskletIds) {
+
+    final List<TInput> inputList = new ArrayList<>(taskletIds.size());
+
+    for(final int taskletId : taskletIds) {
+      inputList.add(taskletIdInputMap.remove(taskletId));
+    }
+
+    return inputList;
+  }
+}
\ No newline at end of file

Reply via email to