http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java deleted file mode 100644 index 7a82eee..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** - * Worker-to-Master protocol. - * A report of Tasklet statuses sent form the {@link org.apache.reef.vortex.evaluator.VortexWorker} - * to the {@link org.apache.reef.vortex.driver.VortexMaster}. - */ -@Private -@Unstable -@DriverSide -public final class WorkerReport { - private ArrayList<TaskletReport> taskletReports; - - public WorkerReport(final Collection<TaskletReport> taskletReports) { - this.taskletReports = new ArrayList<>(taskletReports); - } - - /** - * @return the list of Tasklet reports. - */ - public List<TaskletReport> getTaskletReports() { - return Collections.unmodifiableList(taskletReports); - } -}
http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java index 35cf59e..df2e161 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java @@ -17,6 +17,6 @@ * under the License. */ /** - * Vortex Code used both in Vortex Driver and Vortex Evaluator. + * Code commonly used in VortexMaster and VortexWorker. */ package org.apache.reef.vortex.common; http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java new file mode 100644 index 0000000..19d1dc8 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.io.Tuple; +import org.apache.reef.vortex.api.VortexAggregateFunction; +import org.apache.reef.vortex.api.VortexAggregatePolicy; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * A repository for {@link VortexAggregateFunction} and its associated {@link VortexAggregatePolicy}. + */ +@ThreadSafe +@Unstable +@Private +final class AggregateFunctionRepository { + private final ConcurrentMap<Integer, Tuple<VortexAggregateFunction, VortexAggregatePolicy>> + aggregateFunctionMap = new ConcurrentHashMap<>(); + + @Inject + private AggregateFunctionRepository() { + } + + /** + * Associates an aggregate function ID with a {@link VortexAggregateFunction} and a {@link VortexAggregatePolicy}. + */ + public Tuple<VortexAggregateFunction, VortexAggregatePolicy> put( + final int aggregateFunctionId, + final VortexAggregateFunction aggregateFunction, + final VortexAggregatePolicy policy) { + return aggregateFunctionMap.put(aggregateFunctionId, new Tuple<>(aggregateFunction, policy)); + } + + /** + * Gets the {@link VortexAggregateFunction} associated with the aggregate function ID. + */ + public VortexAggregateFunction getAggregateFunction(final int aggregateFunctionId) { + return aggregateFunctionMap.get(aggregateFunctionId).getKey(); + } + + /** + * Gets the {@link VortexAggregatePolicy} associated with the aggregate function ID. + */ + public VortexAggregatePolicy getPolicy(final int aggregateFunctionId) { + return aggregateFunctionMap.get(aggregateFunctionId).getValue(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java index 21b1bd0..d508893 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java @@ -20,11 +20,10 @@ package org.apache.reef.vortex.driver; import net.jcip.annotations.ThreadSafe; import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.io.serialization.Codec; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.*; -import org.apache.reef.vortex.common.*; +import org.apache.reef.vortex.protocol.workertomaster.*; import javax.inject.Inject; import java.util.*; @@ -71,11 +70,10 @@ final class DefaultVortexMaster implements VortexMaster { // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch. final VortexFuture<TOutput> vortexFuture; final int id = taskletIdCounter.getAndIncrement(); - final Codec<TOutput> outputCodec = function.getOutputCodec(); if (callback.isPresent()) { - vortexFuture = new VortexFuture<>(executor, this, id, outputCodec, callback.get()); + vortexFuture = new VortexFuture<>(executor, this, id, callback.get()); } else { - vortexFuture = new VortexFuture<>(executor, this, id, outputCodec); + vortexFuture = new VortexFuture<>(executor, this, id); } final Tasklet tasklet = new Tasklet<>(id, Optional.<Integer>empty(), function, input, vortexFuture); @@ -96,8 +94,7 @@ final class DefaultVortexMaster implements VortexMaster { final List<TInput> inputs, final Optional<FutureCallback<AggregateResult<TInput, TOutput>>> callback) { final int aggregateFunctionId = aggregateIdCounter.getAndIncrement(); - aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction, vortexFunction, policy); - final Codec<TOutput> aggOutputCodec = aggregateFunction.getOutputCodec(); + aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction, policy); final List<Tasklet> tasklets = new ArrayList<>(inputs.size()); final Map<Integer, TInput> taskletIdInputMap = new HashMap<>(inputs.size()); @@ -105,10 +102,12 @@ final class DefaultVortexMaster implements VortexMaster { taskletIdInputMap.put(taskletIdCounter.getAndIncrement(), input); } - final VortexAggregateFuture<TInput, TOutput> vortexAggregateFuture = - callback.isPresent() ? - new VortexAggregateFuture<>(executor, taskletIdInputMap, aggOutputCodec, callback.get()) : - new VortexAggregateFuture<>(executor, taskletIdInputMap, aggOutputCodec, null); + final VortexAggregateFuture<TInput, TOutput> vortexAggregateFuture; + if (callback.isPresent()) { + vortexAggregateFuture = new VortexAggregateFuture<>(executor, taskletIdInputMap, callback.get()); + } else { + vortexAggregateFuture = new VortexAggregateFuture<>(executor, taskletIdInputMap, null); + } for (final Map.Entry<Integer, TInput> taskletIdInputEntry : taskletIdInputMap.entrySet()) { final Tasklet tasklet = new Tasklet<>(taskletIdInputEntry.getKey(), Optional.of(aggregateFunctionId), @@ -151,37 +150,37 @@ final class DefaultVortexMaster implements VortexMaster { } @Override - public void workerReported(final String workerId, final WorkerReport workerReport) { - for (final TaskletReport taskletReport : workerReport.getTaskletReports()) { - switch (taskletReport.getType()) { + public void workerReported(final String workerId, final WorkerToMasterReports workerToMasterReports) { + for (final WorkerToMasterReport workerToMasterReport : workerToMasterReports.getReports()) { + switch (workerToMasterReport.getType()) { case TaskletResult: - final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport; + final TaskletResultReport taskletResultReport = (TaskletResultReport) workerToMasterReport; final int resultTaskletId = taskletResultReport.getTaskletId(); final List<Integer> singletonResultTaskletId = Collections.singletonList(resultTaskletId); runningWorkers.doneTasklets(workerId, singletonResultTaskletId); - fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, taskletResultReport.getSerializedResult()); + fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, taskletResultReport.getResult()); break; case TaskletAggregationResult: final TaskletAggregationResultReport taskletAggregationResultReport = - (TaskletAggregationResultReport) taskletReport; + (TaskletAggregationResultReport) workerToMasterReport; final List<Integer> aggregatedTaskletIds = taskletAggregationResultReport.getTaskletIds(); runningWorkers.doneTasklets(workerId, aggregatedTaskletIds); fetchDelegate(aggregatedTaskletIds).aggregationCompleted( - aggregatedTaskletIds, taskletAggregationResultReport.getSerializedResult()); + aggregatedTaskletIds, taskletAggregationResultReport.getResult()); break; case TaskletCancelled: - final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport; + final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) workerToMasterReport; final List<Integer> cancelledIdToList = Collections.singletonList(taskletCancelledReport.getTaskletId()); runningWorkers.doneTasklets(workerId, cancelledIdToList); fetchDelegate(cancelledIdToList).cancelled(taskletCancelledReport.getTaskletId()); break; case TaskletFailure: - final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport; + final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerToMasterReport; final int failureTaskletId = taskletFailureReport.getTaskletId(); final List<Integer> singletonFailedTaskletId = Collections.singletonList(failureTaskletId); @@ -191,7 +190,7 @@ final class DefaultVortexMaster implements VortexMaster { break; case TaskletAggregationFailure: final TaskletAggregationFailureReport taskletAggregationFailureReport = - (TaskletAggregationFailureReport) taskletReport; + (TaskletAggregationFailureReport) workerToMasterReport; final List<Integer> aggregationFailedTaskletIds = taskletAggregationFailureReport.getTaskletIds(); runningWorkers.doneTasklets(workerId, aggregationFailedTaskletIds); http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java index f207ab5..d016891 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java @@ -22,7 +22,6 @@ import net.jcip.annotations.ThreadSafe; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.util.Optional; -import org.apache.reef.vortex.common.AggregateFunctionRepository; import javax.inject.Inject; http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java index 063835f..9f64bf0 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java @@ -21,7 +21,6 @@ package org.apache.reef.vortex.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.VortexFunction; -import org.apache.reef.vortex.common.VortexFutureDelegate; /** * Representation of user task in Driver. http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java index ca58174..5e796c9 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java @@ -28,8 +28,9 @@ import org.apache.reef.tang.Configurations; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.annotations.Unit; import org.apache.reef.vortex.api.VortexStart; -import org.apache.reef.vortex.common.*; +import org.apache.reef.vortex.common.KryoUtils; import org.apache.reef.vortex.evaluator.VortexWorker; +import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReports; import org.apache.reef.wake.EStage; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.SingleThreadStage; @@ -64,7 +65,7 @@ final class VortexDriver { private final EStage<VortexStart> vortexStartEStage; private final VortexStart vortexStart; private final EStage<Integer> pendingTaskletSchedulerEStage; - private final VortexAvroUtils vortexAvroUtils; + private final KryoUtils kryoUtils; @Inject private VortexDriver(final EvaluatorRequestor evaluatorRequestor, @@ -73,7 +74,7 @@ final class VortexDriver { final VortexStart vortexStart, final VortexStartExecutor vortexStartExecutor, final PendingTaskletLauncher pendingTaskletLauncher, - final VortexAvroUtils vortexAvroUtils, + final KryoUtils kryoUtils, @Parameter(VortexMasterConf.WorkerMem.class) final int workerMem, @Parameter(VortexMasterConf.WorkerNum.class) final int workerNum, @Parameter(VortexMasterConf.WorkerCores.class) final int workerCores, @@ -81,7 +82,7 @@ final class VortexDriver { this.vortexStartEStage = new ThreadPoolStage<>(vortexStartExecutor, numOfStartThreads); this.vortexStart = vortexStart; this.pendingTaskletSchedulerEStage = new SingleThreadStage<>(pendingTaskletLauncher, 1); - this.vortexAvroUtils = vortexAvroUtils; + this.kryoUtils = kryoUtils; this.evaluatorRequestor = evaluatorRequestor; this.vortexMaster = vortexMaster; this.vortexRequestor = vortexRequestor; @@ -154,8 +155,9 @@ final class VortexDriver { @Override public void onNext(final TaskMessage taskMessage) { final String workerId = taskMessage.getId(); - final WorkerReport workerReport = vortexAvroUtils.toWorkerReport(taskMessage.get()); - vortexMaster.workerReported(workerId, workerReport); + final WorkerToMasterReports workerToMasterReports = + (WorkerToMasterReports)kryoUtils.deserialize(taskMessage.get()); + vortexMaster.workerReported(workerId, workerToMasterReports); } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexFutureDelegate.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexFutureDelegate.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexFutureDelegate.java new file mode 100644 index 0000000..e84840d --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexFutureDelegate.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +import java.util.List; + +/** + * Exposes functions to be called by the {@link org.apache.reef.vortex.driver.VortexMaster} + * to note that a list of Tasklets associated with a Future has completed. + */ +@Unstable +@DriverSide +@Private +public interface VortexFutureDelegate<TOutput> { + + /** + * A Tasklet associated with the future has completed with a result. + */ + void completed(final int taskletId, TOutput result); + + /** + * The list of aggregated Tasklets associated with the Future that have completed with a result. + */ + void aggregationCompleted(final List<Integer> taskletIds, final TOutput result); + + /** + * A Tasklet associated with the Future has thrown an Exception. + */ + void threwException(final int taskletId, final Exception exception); + + /** + * The list of Tasklets associated with the Future that have thrown an Exception. + */ + void aggregationThrewException(final List<Integer> taskletIds, final Exception exception); + + /** + * A Tasklet associated with the Future has been cancelled. + */ + void cancelled(final int taskletId); +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java index 95cec93..959ed5f 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java @@ -23,7 +23,7 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.*; -import org.apache.reef.vortex.common.WorkerReport; +import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReports; import java.util.List; @@ -74,7 +74,7 @@ public interface VortexMaster { /** * Call this when a worker has reported back. */ - void workerReported(final String workerId, final WorkerReport workerReport); + void workerReported(final String workerId, final WorkerToMasterReports workerToMasterReports); /** * Release all resources and shut down. http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java index 4aabf32..0f93dd2 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java @@ -20,8 +20,8 @@ package org.apache.reef.vortex.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.driver.task.RunningTask; -import org.apache.reef.vortex.common.VortexAvroUtils; -import org.apache.reef.vortex.common.VortexRequest; +import org.apache.reef.vortex.common.KryoUtils; +import org.apache.reef.vortex.protocol.mastertoworker.MasterToWorkerRequest; import javax.inject.Inject; import java.util.concurrent.ExecutorService; @@ -33,30 +33,30 @@ import java.util.concurrent.Executors; @DriverSide class VortexRequestor { private final ExecutorService executorService = Executors.newCachedThreadPool(); - private final VortexAvroUtils vortexAvroUtils; + private final KryoUtils kryoUtils; @Inject - VortexRequestor(final VortexAvroUtils vortexAvroUtils) { - this.vortexAvroUtils = vortexAvroUtils; + VortexRequestor(final KryoUtils kryoUtils) { + this.kryoUtils = kryoUtils; } /** - * Sends a {@link VortexRequest} asynchronously to a {@link org.apache.reef.vortex.evaluator.VortexWorker}. + * Sends a {@link MasterToWorkerRequest} asynchronously to a {@link org.apache.reef.vortex.evaluator.VortexWorker}. */ - void sendAsync(final RunningTask reefTask, final VortexRequest vortexRequest) { + void sendAsync(final RunningTask reefTask, final MasterToWorkerRequest masterToWorkerRequest) { executorService.execute(new Runnable() { @Override public void run() { // Possible race condition with VortexWorkerManager#terminate is addressed by the global lock in VortexMaster - send(reefTask, vortexRequest); + send(reefTask, masterToWorkerRequest); } }); } /** - * Sends a {@link VortexRequest} synchronously to a {@link org.apache.reef.vortex.evaluator.VortexWorker}. + * Sends a {@link MasterToWorkerRequest} synchronously to a {@link org.apache.reef.vortex.evaluator.VortexWorker}. */ - void send(final RunningTask reefTask, final VortexRequest vortexRequest) { - reefTask.send(vortexAvroUtils.toBytes(vortexRequest)); + void send(final RunningTask reefTask, final MasterToWorkerRequest masterToWorkerRequest) { + reefTask.send(kryoUtils.serialize(masterToWorkerRequest)); } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java index 088e3cf..764d6a3 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java @@ -24,10 +24,10 @@ import org.apache.reef.driver.task.RunningTask; import org.apache.reef.vortex.api.VortexAggregateFunction; import org.apache.reef.vortex.api.VortexAggregatePolicy; import org.apache.reef.vortex.api.VortexFunction; -import org.apache.reef.vortex.common.TaskletAggregateExecutionRequest; -import org.apache.reef.vortex.common.TaskletAggregationRequest; -import org.apache.reef.vortex.common.TaskletCancellationRequest; -import org.apache.reef.vortex.common.TaskletExecutionRequest; +import org.apache.reef.vortex.protocol.mastertoworker.TaskletAggregateExecutionRequest; +import org.apache.reef.vortex.protocol.mastertoworker.TaskletAggregationRequest; +import org.apache.reef.vortex.protocol.mastertoworker.TaskletCancellationRequest; +import org.apache.reef.vortex.protocol.mastertoworker.TaskletExecutionRequest; import java.util.*; http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java index 1ea3876..21f27d4 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java @@ -24,7 +24,9 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.task.HeartBeatTriggerManager; -import org.apache.reef.vortex.common.*; +import org.apache.reef.vortex.common.KryoUtils; +import org.apache.reef.vortex.protocol.mastertoworker.TaskletAggregationRequest; +import org.apache.reef.vortex.protocol.workertomaster.*; import javax.annotation.concurrent.GuardedBy; import java.util.*; @@ -46,7 +48,7 @@ final class AggregateContainer { private final Object stateLock = new Object(); private final TaskletAggregationRequest taskletAggregationRequest; private final HeartBeatTriggerManager heartBeatTriggerManager; - private final VortexAvroUtils vortexAvroUtils; + private final KryoUtils kryoUtils; private final BlockingDeque<byte[]> workerReportsQueue; private final ScheduledExecutorService timer = Executors.newScheduledThreadPool(1); @@ -60,11 +62,11 @@ final class AggregateContainer { private final List<Pair<Integer, Exception>> failedTasklets = new ArrayList<>(); AggregateContainer(final HeartBeatTriggerManager heartBeatTriggerManager, - final VortexAvroUtils vortexAvroUtils, + final KryoUtils kryoUtils, final BlockingDeque<byte[]> workerReportsQueue, final TaskletAggregationRequest taskletAggregationRequest) { this.heartBeatTriggerManager = heartBeatTriggerManager; - this.vortexAvroUtils = vortexAvroUtils; + this.kryoUtils = kryoUtils; this.workerReportsQueue = workerReportsQueue; this.taskletAggregationRequest = taskletAggregationRequest; } @@ -74,7 +76,7 @@ final class AggregateContainer { } @GuardedBy("stateLock") - private void aggregateTasklets(final List<TaskletReport> taskletReports, + private void aggregateTasklets(final List<WorkerToMasterReport> workerToMasterReports, final List<Object> results, final List<Integer> aggregatedTasklets) { synchronized (stateLock) { @@ -86,7 +88,7 @@ final class AggregateContainer { // Add failed tasklets to worker report. for (final Pair<Integer, Exception> failedPair : failedTasklets) { - taskletReports.add(new TaskletFailureReport(failedPair.getLeft(), failedPair.getRight())); + workerToMasterReports.add(new TaskletFailureReport(failedPair.getLeft(), failedPair.getRight())); } // Drain the tasklets. @@ -96,11 +98,11 @@ final class AggregateContainer { } /** - * Performs the output aggregation and generates the {@link WorkerReport} to report back to the + * Performs the output aggregation and generates the {@link WorkerToMasterReports} to report back to the * {@link org.apache.reef.vortex.driver.VortexDriver}. */ private void aggregateTasklets(final AggregateTriggerType type) { - final List<TaskletReport> taskletReports = new ArrayList<>(); + final List<WorkerToMasterReport> workerToMasterReports = new ArrayList<>(); final List<Object> results = new ArrayList<>(); final List<Integer> aggregatedTasklets = new ArrayList<>(); @@ -108,14 +110,14 @@ final class AggregateContainer { synchronized (stateLock) { switch(type) { case ALARM: - aggregateTasklets(taskletReports, results, aggregatedTasklets); + aggregateTasklets(workerToMasterReports, results, aggregatedTasklets); break; case COUNT: if (!aggregateOnCount()) { return; } - aggregateTasklets(taskletReports, results, aggregatedTasklets); + aggregateTasklets(workerToMasterReports, results, aggregatedTasklets); break; default: throw new RuntimeException("Unexpected aggregate type."); @@ -125,16 +127,16 @@ final class AggregateContainer { if (!results.isEmpty()) { // Run the aggregation function. try { - final byte[] aggregationResult = taskletAggregationRequest.executeAggregation(results); - taskletReports.add(new TaskletAggregationResultReport(aggregatedTasklets, aggregationResult)); + final Object aggregationResult = taskletAggregationRequest.executeAggregation(results); + workerToMasterReports.add(new TaskletAggregationResultReport(aggregatedTasklets, aggregationResult)); } catch (final Exception e) { - taskletReports.add(new TaskletAggregationFailureReport(aggregatedTasklets, e)); + workerToMasterReports.add(new TaskletAggregationFailureReport(aggregatedTasklets, e)); } } // Add to worker report only if there is something to report back. - if (!taskletReports.isEmpty()) { - workerReportsQueue.addLast(vortexAvroUtils.toBytes(new WorkerReport(taskletReports))); + if (!workerToMasterReports.isEmpty()) { + workerReportsQueue.addLast(kryoUtils.serialize(new WorkerToMasterReports(workerToMasterReports))); heartBeatTriggerManager.triggerHeartBeat(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java index 3d53bc4..5b0d38d 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java @@ -29,8 +29,9 @@ import org.apache.reef.task.TaskMessageSource; import org.apache.reef.task.events.CloseEvent; import org.apache.reef.task.events.DriverMessage; import org.apache.reef.util.Optional; -import org.apache.reef.vortex.common.*; -import org.apache.reef.vortex.common.AggregateFunctionRepository; +import org.apache.reef.vortex.common.KryoUtils; +import org.apache.reef.vortex.protocol.mastertoworker.*; +import org.apache.reef.vortex.protocol.workertomaster.*; import org.apache.reef.vortex.driver.VortexWorkerConf; import org.apache.reef.wake.EventHandler; @@ -56,20 +57,17 @@ public final class VortexWorker implements Task, TaskMessageSource { private final BlockingDeque<byte[]> workerReports = new LinkedBlockingDeque<>(); private final ConcurrentMap<Integer, AggregateContainer> aggregates = new ConcurrentHashMap<>(); - private final AggregateFunctionRepository aggregateFunctionRepository; - private final VortexAvroUtils vortexAvroUtils; + private final KryoUtils kryoUtils; private final HeartBeatTriggerManager heartBeatTriggerManager; private final int numOfThreads; private final CountDownLatch terminated = new CountDownLatch(1); @Inject private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager, - final AggregateFunctionRepository aggregateFunctionRepository, - final VortexAvroUtils vortexAvroUtils, + final KryoUtils kryoUtils, @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) { this.heartBeatTriggerManager = heartBeatTriggerManager; - this.aggregateFunctionRepository = aggregateFunctionRepository; - this.vortexAvroUtils = vortexAvroUtils; + this.kryoUtils = kryoUtils; this.numOfThreads = numOfThreads; } @@ -97,30 +95,24 @@ public final class VortexWorker implements Task, TaskMessageSource { } // Command Executor: Deserialize the command - final VortexRequest vortexRequest = vortexAvroUtils.toVortexRequest(message); + final MasterToWorkerRequest masterToWorkerRequest = (MasterToWorkerRequest)kryoUtils.deserialize(message); - switch (vortexRequest.getType()) { + switch (masterToWorkerRequest.getType()) { case AggregateTasklets: - final TaskletAggregationRequest taskletAggregationRequest = (TaskletAggregationRequest) vortexRequest; + final TaskletAggregationRequest taskletAggregationRequest = + (TaskletAggregationRequest) masterToWorkerRequest; aggregates.put(taskletAggregationRequest.getAggregateFunctionId(), - new AggregateContainer(heartBeatTriggerManager, vortexAvroUtils, workerReports, + new AggregateContainer(heartBeatTriggerManager, kryoUtils, workerReports, taskletAggregationRequest)); - - // VortexFunctions need to be put into the repository such that VortexAvroUtils will know how to - // convert inputs and functions into a VortexRequest on subsequent messages requesting to - // execute the aggregateable tasklets. - aggregateFunctionRepository.put(taskletAggregationRequest.getAggregateFunctionId(), - taskletAggregationRequest.getAggregateFunction(), taskletAggregationRequest.getFunction(), - taskletAggregationRequest.getPolicy()); break; case ExecuteAggregateTasklet: - executeAggregateTasklet(commandExecutor, vortexRequest); + executeAggregateTasklet(commandExecutor, masterToWorkerRequest); break; case ExecuteTasklet: - executeTasklet(commandExecutor, futures, vortexRequest); + executeTasklet(commandExecutor, futures, masterToWorkerRequest); break; case CancelTasklet: - final TaskletCancellationRequest cancellationRequest = (TaskletCancellationRequest) vortexRequest; + final TaskletCancellationRequest cancellationRequest = (TaskletCancellationRequest) masterToWorkerRequest; LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", cancellationRequest.getTaskletId()); final Future future = futures.get(cancellationRequest.getTaskletId()); if (future != null) { @@ -143,9 +135,9 @@ public final class VortexWorker implements Task, TaskMessageSource { */ private void executeTasklet(final ExecutorService commandExecutor, final ConcurrentMap<Integer, Future> futures, - final VortexRequest vortexRequest) { + final MasterToWorkerRequest masterToWorkerRequest) { final CountDownLatch latch = new CountDownLatch(1); - final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; + final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) masterToWorkerRequest; // Scheduler Thread: Pass the command to the worker thread pool to be executed // Record future to support cancellation. @@ -154,32 +146,30 @@ public final class VortexWorker implements Task, TaskMessageSource { commandExecutor.submit(new Runnable() { @Override public void run() { - final WorkerReport workerReport; - final List<TaskletReport> taskletReports = new ArrayList<>(); + final WorkerToMasterReports reports; + final List<WorkerToMasterReport> holder = new ArrayList<>(); try { // Command Executor: Execute the command - final TaskletReport taskletReport = - new TaskletResultReport(taskletExecutionRequest.getTaskletId(), - taskletExecutionRequest.execute()); - taskletReports.add(taskletReport); + final WorkerToMasterReport workerToMasterReport = + new TaskletResultReport(taskletExecutionRequest.getTaskletId(), taskletExecutionRequest.execute()); + holder.add(workerToMasterReport); } catch (final InterruptedException ex) { // Assumes that user's thread follows convention that cancelled Futures // should throw InterruptedException. - final TaskletReport taskletReport = + final WorkerToMasterReport workerToMasterReport = new TaskletCancelledReport(taskletExecutionRequest.getTaskletId()); - LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", - taskletExecutionRequest.getTaskletId()); - taskletReports.add(taskletReport); + LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", taskletExecutionRequest.getTaskletId()); + holder.add(workerToMasterReport); } catch (Exception e) { // Command Executor: Tasklet throws an exception - final TaskletReport taskletReport = + final WorkerToMasterReport workerToMasterReport = new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); - taskletReports.add(taskletReport); + holder.add(workerToMasterReport); } - workerReport = new WorkerReport(taskletReports); - workerReports.addLast(vortexAvroUtils.toBytes(workerReport)); + reports = new WorkerToMasterReports(holder); + workerReports.addLast(kryoUtils.serialize(reports)); try { latch.await(); } catch (final InterruptedException e) { @@ -199,14 +189,14 @@ public final class VortexWorker implements Task, TaskMessageSource { * Executes an aggregation request from the {@link org.apache.reef.vortex.driver.VortexDriver}. */ private void executeAggregateTasklet(final ExecutorService commandExecutor, - final VortexRequest vortexRequest) { + final MasterToWorkerRequest masterToWorkerRequest) { final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest = - (TaskletAggregateExecutionRequest) vortexRequest; + (TaskletAggregateExecutionRequest) masterToWorkerRequest; assert aggregates.containsKey(taskletAggregateExecutionRequest.getAggregateFunctionId()); - final AggregateContainer aggregateContainer = aggregates.get( - taskletAggregateExecutionRequest.getAggregateFunctionId()); + final AggregateContainer aggregateContainer = + aggregates.get(taskletAggregateExecutionRequest.getAggregateFunctionId()); final TaskletAggregationRequest aggregationRequest = aggregateContainer.getTaskletAggregationRequest(); commandExecutor.submit(new Runnable() { http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java index 5e32c8c..299a31a 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java @@ -18,15 +18,12 @@ */ package org.apache.reef.vortex.examples.addone; -import org.apache.reef.io.serialization.Codec; -import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.vortex.api.VortexFunction; /** * Outputs input + 1. */ final class AddOneFunction implements VortexFunction<Integer, Integer> { - private static final Codec<Integer> CODEC = new SerializableCodec<>(); /** * Outputs input + 1. */ @@ -34,14 +31,4 @@ final class AddOneFunction implements VortexFunction<Integer, Integer> { public Integer call(final Integer input) throws Exception { return input + 1; } - - @Override - public Codec<Integer> getInputCodec() { - return CODEC; - } - - @Override - public Codec<Integer> getOutputCodec() { - return CODEC; - } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java index 89c6918..0496f27 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java @@ -18,15 +18,12 @@ */ package org.apache.reef.vortex.examples.hello; -import org.apache.reef.io.serialization.Codec; import org.apache.reef.vortex.api.VortexFunction; -import org.apache.reef.vortex.util.VoidCodec; /** * Prints to stdout. */ final class HelloVortexFunction implements VortexFunction<Void, Void> { - private static final Codec<Void> CODEC = new VoidCodec(); /** * Prints to stdout. */ @@ -35,14 +32,4 @@ final class HelloVortexFunction implements VortexFunction<Void, Void> { System.out.println("Hello, Vortex!"); return null; } - - @Override - public Codec<Void> getInputCodec() { - return CODEC; - } - - @Override - public Codec<Void> getOutputCodec() { - return CODEC; - } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulException.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulException.java index e07e977..e3e0a0d 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulException.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulException.java @@ -23,6 +23,11 @@ package org.apache.reef.vortex.examples.matmul; */ class MatMulException extends Exception { /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + MatMulException() {} + + /** * Constructor of MatMulException. * @param message Message to inform users. */ http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java index db4a320..2a2374a 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java @@ -18,16 +18,12 @@ */ package org.apache.reef.vortex.examples.matmul; -import org.apache.reef.io.serialization.Codec; import org.apache.reef.vortex.api.VortexFunction; /** * Computes multiplication of two matrices. */ final class MatMulFunction implements VortexFunction<MatMulInput, MatMulOutput> { - private static final Codec<MatMulInput> INPUT_CODEC = new MatMulInputCodec(); - private static final Codec<MatMulOutput> OUTPUT_CODEC = new MatMulOutputCodec(); - /** * Computes multiplication of two matrices. * @param input Input which contains two matrices to multiply, @@ -43,14 +39,4 @@ final class MatMulFunction implements VortexFunction<MatMulInput, MatMulOutput> final Matrix<Double> result = leftMatrix.multiply(rightMatrix); return new MatMulOutput(index, result); } - - @Override - public Codec<MatMulInput> getInputCodec() { - return INPUT_CODEC; - } - - @Override - public Codec<MatMulOutput> getOutputCodec() { - return OUTPUT_CODEC; - } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java index b35b402..baae96c 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java @@ -23,9 +23,15 @@ package org.apache.reef.vortex.examples.matmul; * and index of the sub-matrix in the entire result. */ final class MatMulInput { - private final int index; - private final Matrix<Double> leftMatrix; - private final Matrix<Double> rightMatrix; + private int index; + private Matrix<Double> leftMatrix; + private Matrix<Double> rightMatrix; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + MatMulInput() { + } /** * Constructor of MatMulInput which consists of two matrices. http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java deleted file mode 100644 index 229ba0e..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.examples.matmul; - -import org.apache.reef.io.serialization.Codec; - -import java.io.*; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * Encodes/decodes {@link MatMulInput} to/from byte array. - */ -final class MatMulInputCodec implements Codec<MatMulInput> { - - @Override - public byte[] encode(final MatMulInput matMulInput) { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - try (DataOutputStream daos = new DataOutputStream(baos)) { - final int index = matMulInput.getIndex(); - final Matrix<Double> leftMatrix = matMulInput.getLeftMatrix(); - final Matrix<Double> rightMatrix = matMulInput.getRightMatrix(); - - daos.writeInt(index); - encodeMatrixToStream(daos, leftMatrix); - encodeMatrixToStream(daos, rightMatrix); - - return baos.toByteArray(); - } - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public MatMulInput decode(final byte[] buf) { - try (ByteArrayInputStream bais = new ByteArrayInputStream(buf)) { - try (DataInputStream dais = new DataInputStream(bais)) { - final int index = dais.readInt(); - final Matrix leftMatrix = decodeMatrixFromStream(dais); - final Matrix rightMatrix = decodeMatrixFromStream(dais); - return new MatMulInput(index, leftMatrix, rightMatrix); - } - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Encode a Matrix to output stream. - */ - private void encodeMatrixToStream(final DataOutputStream stream, final Matrix<Double> matrix) throws IOException { - final int numRow = matrix.getNumRows(); - final int numColumn = matrix.getNumColumns(); - - stream.writeInt(numRow); - stream.writeInt(numColumn); - - for (final List<Double> row : matrix.getRows()) { - for (final double element : row) { - stream.writeDouble(element); - } - } - } - - /** - * Decode a Matrix from input stream. - */ - private Matrix decodeMatrixFromStream(final DataInputStream stream) throws IOException { - final int numRow = stream.readInt(); - final int numColumn = stream.readInt(); - - final List<List<Double>> rows = new ArrayList<>(numRow); - for (int rowIndex = 0; rowIndex < numRow; rowIndex++) { - final List<Double> row = new ArrayList<>(numColumn); - for (int columnIndex = 0; columnIndex < numColumn; columnIndex++) { - row.add(stream.readDouble()); - } - rows.add(row); - } - return new RowMatrix(Collections.unmodifiableList(rows)); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java index 99ee5bb..8daed08 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java @@ -22,8 +22,14 @@ package org.apache.reef.vortex.examples.matmul; * Output of {@link MatMulFunction} which contains the sub-matrix and index of it in the entire result. */ final class MatMulOutput { - private final int index; - private final Matrix<Double> result; + private int index; + private Matrix<Double> result; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + MatMulOutput() { + } /** * Constructor of the output. http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java deleted file mode 100644 index 04d974a..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.examples.matmul; - -import org.apache.reef.io.serialization.Codec; - -import java.io.*; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * Encodes/decodes {@link MatMulOutput} to/from byte array. - */ -final class MatMulOutputCodec implements Codec<MatMulOutput> { - - @Override - public byte[] encode(final MatMulOutput matMulOutput) { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - try (DataOutputStream daos = new DataOutputStream(baos)) { - final int index = matMulOutput.getIndex(); - final Matrix<Double> result = matMulOutput.getResult(); - - daos.writeInt(index); - encodeMatrixToStream(daos, result); - - return baos.toByteArray(); - } - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public MatMulOutput decode(final byte[] buf) { - try (ByteArrayInputStream bais = new ByteArrayInputStream(buf)) { - try (DataInputStream dais = new DataInputStream(bais)) { - final int index = dais.readInt(); - final Matrix result = decodeMatrixFromStream(dais); - return new MatMulOutput(index, result); - } - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Encode a Matrix to output stream. - */ - private void encodeMatrixToStream(final DataOutputStream stream, final Matrix<Double> matrix) throws IOException { - final int numRow = matrix.getNumRows(); - final int numColumn = matrix.getNumColumns(); - - stream.writeInt(numRow); - stream.writeInt(numColumn); - - for (final List<Double> row : matrix.getRows()) { - for (final double element : row) { - stream.writeDouble(element); - } - } - } - - /** - * Decode a Matrix from input stream. - */ - private Matrix decodeMatrixFromStream(final DataInputStream stream) throws IOException { - final int numRow = stream.readInt(); - final int numColumn = stream.readInt(); - - final List<List<Double>> rows = new ArrayList<>(numRow); - for (int rowIndex = 0; rowIndex < numRow; rowIndex++) { - final List<Double> row = new ArrayList<>(numColumn); - for (int columnIndex = 0; columnIndex < numColumn; columnIndex++) { - row.add(stream.readDouble()); - } - rows.add(row); - } - return new RowMatrix(Collections.unmodifiableList(rows)); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/RowMatrix.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/RowMatrix.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/RowMatrix.java index a0ca817..9de71d8 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/RowMatrix.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/RowMatrix.java @@ -26,7 +26,13 @@ import java.util.List; * Row-oriented matrix implementation used in {@link MatMul} example. */ final class RowMatrix implements Matrix<Double> { - private final List<List<Double>> rows; + private List<List<Double>> rows; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + RowMatrix() { + } /** * Constructor of matrix which creates an empty matrix of size (numRow x numColumn). http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java index 391de6e..dde2b5a 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java @@ -18,8 +18,6 @@ */ package org.apache.reef.vortex.examples.sumones; -import org.apache.reef.io.serialization.Codec; -import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.vortex.api.VortexAggregateException; import org.apache.reef.vortex.api.VortexAggregateFunction; @@ -29,8 +27,6 @@ import java.util.List; * Aggregates and sums the outputs. */ public final class AdditionAggregateFunction implements VortexAggregateFunction<Integer> { - private static final Codec<Integer> CODEC = new SerializableCodec<>(); - @Override public Integer call(final List<Integer> taskletOutputs) throws VortexAggregateException { int sum = 0; @@ -40,9 +36,4 @@ public final class AdditionAggregateFunction implements VortexAggregateFunction< return sum; } - - @Override - public Codec<Integer> getOutputCodec() { - return CODEC; - } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java index b7a69ef..890b9d9 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java @@ -18,16 +18,12 @@ */ package org.apache.reef.vortex.examples.sumones; -import org.apache.reef.io.serialization.Codec; -import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.vortex.api.VortexFunction; /** * Identity function. */ public final class IdentityFunction implements VortexFunction<Integer, Integer> { - private static final Codec<Integer> CODEC = new SerializableCodec<>(); - /** * Outputs input. */ @@ -35,14 +31,4 @@ public final class IdentityFunction implements VortexFunction<Integer, Integer> public Integer call(final Integer input) throws Exception { return input; } - - @Override - public Codec<Integer> getInputCodec() { - return CODEC; - } - - @Override - public Codec<Integer> getOutputCodec() { - return CODEC; - } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/MasterToWorkerRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/MasterToWorkerRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/MasterToWorkerRequest.java new file mode 100644 index 0000000..5b59ccc --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/MasterToWorkerRequest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.mastertoworker; + +import org.apache.reef.annotations.Unstable; + +/** + * Master-to-Worker protocol. + */ +@Unstable +public interface MasterToWorkerRequest { + /** + * Type of Request. + */ + enum Type { + AggregateTasklets, + ExecuteTasklet, + CancelTasklet, + ExecuteAggregateTasklet + } + + /** + * @return the type of this MasterToWorkerRequest. + */ + Type getType(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregateExecutionRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregateExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregateExecutionRequest.java new file mode 100644 index 0000000..24527d4 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregateExecutionRequest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.mastertoworker; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +/** + * A request from the Vortex Driver to run an aggregate-able function. + */ +@Unstable +@Private +@DriverSide +public final class TaskletAggregateExecutionRequest<TInput> implements MasterToWorkerRequest { + private TInput input; + private int aggregateFunctionId; + private int taskletId; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + TaskletAggregateExecutionRequest() { + } + + public TaskletAggregateExecutionRequest(final int taskletId, + final int aggregateFunctionId, + final TInput input) { + this.taskletId = taskletId; + this.input = input; + this.aggregateFunctionId = aggregateFunctionId; + } + + /** + * @return input of the request. + */ + public TInput getInput() { + return input; + } + + /** + * @return tasklet ID corresponding to the tasklet request. + */ + public int getTaskletId() { + return taskletId; + } + + /** + * @return the AggregateFunctionID of the request. + */ + public int getAggregateFunctionId() { + return aggregateFunctionId; + } + + @Override + public Type getType() { + return Type.ExecuteAggregateTasklet; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregationRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregationRequest.java new file mode 100644 index 0000000..bfa90d9 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregationRequest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.mastertoworker; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.vortex.api.VortexAggregateFunction; +import org.apache.reef.vortex.api.VortexAggregatePolicy; +import org.apache.reef.vortex.api.VortexFunction; + +import java.util.List; + +/** + * A request from the Vortex Driver for the {@link org.apache.reef.vortex.evaluator.VortexWorker} to + * record aggregate functions for later execution. + */ +@Unstable +@Private +@DriverSide +public final class TaskletAggregationRequest<TInput, TOutput> implements MasterToWorkerRequest { + private int aggregateFunctionId; + private VortexAggregateFunction<TOutput> userAggregateFunction; + private VortexFunction<TInput, TOutput> function; + private VortexAggregatePolicy policy; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + TaskletAggregationRequest() { + } + + public TaskletAggregationRequest(final int aggregateFunctionId, + final VortexAggregateFunction<TOutput> aggregateFunction, + final VortexFunction<TInput, TOutput> function, + final VortexAggregatePolicy policy) { + this.aggregateFunctionId = aggregateFunctionId; + this.userAggregateFunction = aggregateFunction; + this.function = function; + this.policy = policy; + } + + @Override + public Type getType() { + return Type.AggregateTasklets; + } + + /** + * @return the AggregateFunctionID of the aggregate function. + */ + public int getAggregateFunctionId() { + return aggregateFunctionId; + } + + /** + * @return the aggregate function as specified by the user. + */ + public VortexAggregateFunction getAggregateFunction() { + return userAggregateFunction; + } + + /** + * @return the user specified function. + */ + public VortexFunction getFunction() { + return function; + } + + /** + * @return the aggregation policy. + */ + public VortexAggregatePolicy getPolicy() { + return policy; + } + + /** + * Execute the aggregate function using the list of outputs. + * @return Output of the function. + */ + public TOutput executeAggregation(final List<TOutput> outputs) throws Exception { + return userAggregateFunction.call(outputs); + } + + /** + * Execute the user specified function. + */ + public TOutput executeFunction(final TInput input) throws Exception { + return function.call(input); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletCancellationRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletCancellationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletCancellationRequest.java new file mode 100644 index 0000000..36d0233 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletCancellationRequest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.mastertoworker; + +import org.apache.reef.annotations.Unstable; + +/** + * A {@link MasterToWorkerRequest} to cancel tasklets. + */ +@Unstable +public final class TaskletCancellationRequest implements MasterToWorkerRequest { + private int taskletId; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + TaskletCancellationRequest() { + } + + public TaskletCancellationRequest(final int taskletId) { + this.taskletId = taskletId; + } + + /** + * @return the ID of the VortexTasklet associated with this MasterToWorkerRequest. + */ + public int getTaskletId() { + return taskletId; + } + + @Override + public Type getType() { + return Type.CancelTasklet; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletExecutionRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletExecutionRequest.java new file mode 100644 index 0000000..2963dc1 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletExecutionRequest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.mastertoworker; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.vortex.api.VortexFunction; + +/** + * Request to execute a tasklet. + */ +@Unstable +@Private +public final class TaskletExecutionRequest<TInput, TOutput> implements MasterToWorkerRequest { + private int taskletId; + private VortexFunction<TInput, TOutput> userFunction; + private TInput input; + + /** + * @return the type of this MasterToWorkerRequest. + */ + @Override + public Type getType() { + return Type.ExecuteTasklet; + } + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + TaskletExecutionRequest() { + } + + /** + * Request from Vortex Master to Vortex Worker to execute a tasklet. + */ + public TaskletExecutionRequest(final int taskletId, + final VortexFunction<TInput, TOutput> userFunction, + final TInput input) { + this.taskletId = taskletId; + this.userFunction = userFunction; + this.input = input; + } + + /** + * Execute the function using the input. + * @return Output of the function. + */ + public TOutput execute() throws Exception { + return userFunction.call(input); + } + + /** + * @return the ID of the VortexTasklet associated with this MasterToWorkerRequest. + */ + public int getTaskletId() { + return taskletId; + } + + /** + * Get function of the tasklet. + */ + public VortexFunction getFunction() { + return userFunction; + } + + /** + * Get input of the tasklet. + */ + public TInput getInput() { + return input; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/package-info.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/package-info.java new file mode 100644 index 0000000..6b106b6 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Protocol from VortexMaster to VortexWorker. + */ +package org.apache.reef.vortex.protocol.mastertoworker; http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationFailureReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationFailureReport.java new file mode 100644 index 0000000..bd5dabb --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationFailureReport.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.workertomaster; + +import org.apache.reef.annotations.Unstable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Report of a tasklet exception on aggregation. + */ +@Unstable +public final class TaskletAggregationFailureReport implements WorkerToMasterReport { + private List<Integer> taskletIds; + private Exception exception; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + TaskletAggregationFailureReport() { + } + + /** + * @param taskletIds of the failed tasklet(s). + * @param exception that caused the tasklet failure. + */ + public TaskletAggregationFailureReport(final List<Integer> taskletIds, final Exception exception) { + this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); + this.exception = exception; + } + + /** + * @return the type of this TaskletReport. + */ + @Override + public Type getType() { + return Type.TaskletAggregationFailure; + } + + /** + * @return the taskletIds that failed on aggregation. + */ + public List<Integer> getTaskletIds() { + return taskletIds; + } + + /** + * @return the exception that caused the tasklet aggregation failure. + */ + public Exception getException() { + return exception; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationResultReport.java new file mode 100644 index 0000000..52cfcfa --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationResultReport.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.workertomaster; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Report of a Tasklet aggregation execution result. + */ +@Private +@DriverSide +@Unstable +public final class TaskletAggregationResultReport implements WorkerToMasterReport { + private List<Integer> taskletIds; + private Object result; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + TaskletAggregationResultReport() { + } + + /** + * @param taskletIds of the tasklets. + * @param result of the tasklet execution. + */ + public TaskletAggregationResultReport(final List<Integer> taskletIds, final Object result) { + this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); + this.result = result; + } + + /** + * @return the type of this TaskletReport. + */ + @Override + public Type getType() { + return Type.TaskletAggregationResult; + } + + /** + * @return the TaskletId(s) of this TaskletReport + */ + public List<Integer> getTaskletIds() { + return taskletIds; + } + + /** + * @return the result of the Tasklet aggregation execution. + */ + public Object getResult() { + return result; + } + +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletCancelledReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletCancelledReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletCancelledReport.java new file mode 100644 index 0000000..53d531d --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletCancelledReport.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.workertomaster; + +import org.apache.reef.annotations.Unstable; + +/** + * The report of a cancelled Tasklet. + */ +@Unstable +public final class TaskletCancelledReport implements WorkerToMasterReport { + private int taskletId; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + TaskletCancelledReport() { + } + + /** + * @param taskletId of the cancelled tasklet. + */ + public TaskletCancelledReport(final int taskletId) { + this.taskletId = taskletId; + } + + @Override + public Type getType() { + return Type.TaskletCancelled; + } + + /** + * @return the taskletId of this TaskletReport. + */ + public int getTaskletId() { + return taskletId; + } +}
