Repository: reef Updated Branches: refs/heads/master 120327fea -> 48bde0c0e
http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletFailureReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletFailureReport.java new file mode 100644 index 0000000..2974beb --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletFailureReport.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.workertomaster; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +/** + * Report of a Tasklet exception. + */ +@Unstable +@Private +@DriverSide +public final class TaskletFailureReport implements WorkerToMasterReport { + private int taskletId; + private Exception exception; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + TaskletFailureReport() { + } + + /** + * @param taskletId of the failed Tasklet. + * @param exception that caused the tasklet failure. + */ + public TaskletFailureReport(final int taskletId, final Exception exception) { + this.taskletId = taskletId; + this.exception = exception; + } + + /** + * @return the type of this TaskletReport. + */ + @Override + public Type getType() { + return Type.TaskletFailure; + } + + /** + * @return the taskletId of this TaskletReport. + */ + public int getTaskletId() { + return taskletId; + } + + /** + * @return the exception that caused the Tasklet failure. + */ + public Exception getException() { + return exception; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletResultReport.java new file mode 100644 index 0000000..dec4a59 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletResultReport.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.workertomaster; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +/** + * Report of a tasklet execution result. + */ +@Unstable +@Private +@DriverSide +public final class TaskletResultReport implements WorkerToMasterReport { + private int taskletId; + private Object result; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + TaskletResultReport() { + } + + /** + * @param taskletId of the Tasklet. + * @param result of the tasklet execution. + */ + public TaskletResultReport(final int taskletId, final Object result) { + this.taskletId = taskletId; + this.result = result; + } + + /** + * @return the type of this TaskletReport. + */ + @Override + public Type getType() { + return Type.TaskletResult; + } + + /** + * @return the TaskletId of this TaskletReport + */ + public int getTaskletId() { + return taskletId; + } + + /** + * @return the result of the tasklet execution. + */ + public Object getResult() { + return result; + } + +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReport.java new file mode 100644 index 0000000..eb1238a --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReport.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.workertomaster; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +/** + * The interface for a status report from the {@link org.apache.reef.vortex.evaluator.VortexWorker}. + */ +@Unstable +@Private +@DriverSide +public interface WorkerToMasterReport { + /** + * Type of WorkerToMasterReport. + */ + enum Type { + TaskletResult, + TaskletAggregationResult, + TaskletCancelled, + TaskletFailure, + TaskletAggregationFailure + } + + /** + * @return the type of this WorkerToMasterReport. + */ + Type getType(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReports.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReports.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReports.java new file mode 100644 index 0000000..cb0cdbf --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReports.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.protocol.workertomaster; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Container for multiple WorkerToMasterReports. + */ +@Private +@Unstable +@DriverSide +public final class WorkerToMasterReports { + private ArrayList<WorkerToMasterReport> workerToMasterReports; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + WorkerToMasterReports() { + } + + public WorkerToMasterReports(final Collection<WorkerToMasterReport> workerToMasterReports) { + this.workerToMasterReports = new ArrayList<>(workerToMasterReports); + } + + /** + * @return the list of Tasklet reports. + */ + public List<WorkerToMasterReport> getReports() { + return Collections.unmodifiableList(workerToMasterReports); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/package-info.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/package-info.java new file mode 100644 index 0000000..584847f --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Protocol from VortexWorker to VortexMaster. + */ +package org.apache.reef.vortex.protocol.workertomaster; http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java deleted file mode 100644 index 7e21066..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.util; - -import org.apache.reef.io.serialization.Codec; - -/** - * Codec for empty input/output. - */ -public final class VoidCodec implements Codec<Void> { - - @Override - public byte[] encode(final Void obj) { - return new byte[0]; - } - - @Override - public Void decode(final byte[] buf) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java deleted file mode 100644 index fcbb9f5..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Utilities used in Vortex. - */ -package org.apache.reef.vortex.util; http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java index 0770f77..026453d 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java @@ -18,16 +18,15 @@ */ package org.apache.reef.vortex.driver; -import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.FutureCallback; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; -import org.apache.reef.vortex.common.TaskletFailureReport; -import org.apache.reef.vortex.common.TaskletReport; -import org.apache.reef.vortex.common.TaskletResultReport; -import org.apache.reef.vortex.common.WorkerReport; +import org.apache.reef.vortex.protocol.workertomaster.TaskletFailureReport; +import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReport; +import org.apache.reef.vortex.protocol.workertomaster.TaskletResultReport; +import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReports; import org.junit.Test; import java.util.*; @@ -42,8 +41,8 @@ import static org.junit.Assert.*; * Test whether DefaultVortexMaster correctly handles (simulated) events. */ public class DefaultVortexMasterTest { - private static final byte[] EMPTY_RESULT = new byte[0]; - private static final byte[] INTEGER_RESULT = new SerializableCodec<Integer>().encode(1); + private static final Object EMPTY_RESULT = null; + private static final int INTEGER_RESULT = 1; private TestUtil testUtil = new TestUtil(); /** @@ -81,9 +80,9 @@ public class DefaultVortexMasterTest { final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1); for (final int taskletId : taskletIds) { - final TaskletReport taskletReport = new TaskletResultReport(taskletId, INTEGER_RESULT); + final WorkerToMasterReport workerToMasterReport = new TaskletResultReport(taskletId, INTEGER_RESULT); vortexMaster.workerReported( - vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport))); + vortexWorkerManager1.getId(), new WorkerToMasterReports(Collections.singletonList(workerToMasterReport))); } assertTrue("The VortexFuture should be done", future.isDone()); @@ -123,9 +122,9 @@ public class DefaultVortexMasterTest { // Completed? for (final int taskletId : taskletIds2) { - final TaskletReport taskletReport = new TaskletResultReport(taskletId, EMPTY_RESULT); + final WorkerToMasterReport workerToMasterReport = new TaskletResultReport(taskletId, EMPTY_RESULT); vortexMaster.workerReported( - vortexWorkerManager2.getId(), new WorkerReport(Collections.singletonList(taskletReport))); + vortexWorkerManager2.getId(), new WorkerToMasterReports(Collections.singletonList(workerToMasterReport))); } assertTrue("The VortexFuture should be done", future.isDone()); } @@ -176,9 +175,9 @@ public class DefaultVortexMasterTest { for (final int taskletId : taskletIds2) { final String workerId = runningWorkers.getWhereTaskletWasScheduledTo(taskletId); assertNotNull("The tasklet must have been scheduled", workerId); - final TaskletReport taskletReport = new TaskletResultReport(taskletId, EMPTY_RESULT); + final WorkerToMasterReport workerToMasterReport = new TaskletResultReport(taskletId, EMPTY_RESULT); vortexMaster.workerReported( - workerId, new WorkerReport(Collections.singletonList(taskletReport))); + workerId, new WorkerToMasterReports(Collections.singletonList(workerToMasterReport))); } for (final VortexFuture vortexFuture : vortexFutures) { assertTrue("The VortexFuture should be done", vortexFuture.isDone()); @@ -221,9 +220,10 @@ public class DefaultVortexMasterTest { final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1); for (final int taskletId : taskletIds) { - final TaskletReport taskletReport = new TaskletFailureReport(taskletId, new RuntimeException("Test exception.")); + final WorkerToMasterReport workerToMasterReport = + new TaskletFailureReport(taskletId, new RuntimeException("Test exception.")); vortexMaster.workerReported( - vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport))); + vortexWorkerManager1.getId(), new WorkerToMasterReports(Collections.singletonList(workerToMasterReport))); } assertTrue("The VortexFuture should be done", future.isDone()); http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java index 110143f..2828ccb 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java @@ -20,7 +20,6 @@ package org.apache.reef.vortex.driver; import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.vortex.common.AggregateFunctionRepository; import org.junit.Test; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java index fea4235..b349231 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java @@ -32,7 +32,7 @@ public class SchedulingPolicyTest { private final TestUtil testUtil = new TestUtil(); /** - * Test common traits of different scheduling policies. + * Test protocol traits of different scheduling policies. */ @Test public void testCommon() throws Exception { @@ -102,7 +102,7 @@ public class SchedulingPolicyTest { } /** - * Simple common tests. + * Simple protocol tests. */ private void commonPolicyTests(final SchedulingPolicy policy) { // Initial state http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java index 1656f80..c8d2b9e 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java @@ -19,15 +19,16 @@ package org.apache.reef.vortex.driver; import org.apache.reef.driver.task.RunningTask; -import org.apache.reef.io.serialization.Codec; -import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.vortex.util.VoidCodec; import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; -import org.apache.reef.vortex.common.*; +import org.apache.reef.vortex.protocol.mastertoworker.MasterToWorkerRequest; +import org.apache.reef.vortex.protocol.mastertoworker.TaskletCancellationRequest; +import org.apache.reef.vortex.protocol.workertomaster.TaskletCancelledReport; +import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReport; +import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReports; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -48,9 +49,6 @@ import static org.mockito.Mockito.when; * Utility methods for tests. */ public final class TestUtil { - private static final Codec<Void> VOID_CODEC = new VoidCodec(); - private static final Codec<Integer> INTEGER_CODEC = new SerializableCodec<>(); - private final AtomicInteger taskletId = new AtomicInteger(0); private final AtomicInteger workerId = new AtomicInteger(0); private final Executor executor = Executors.newFixedThreadPool(5); @@ -74,16 +72,16 @@ public final class TestUtil { doAnswer(new Answer() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { - final VortexRequest request = (VortexRequest)invocation.getArguments()[1]; + final MasterToWorkerRequest request = (MasterToWorkerRequest)invocation.getArguments()[1]; if (request instanceof TaskletCancellationRequest) { - final TaskletReport cancelReport = new TaskletCancelledReport( + final WorkerToMasterReport cancelReport = new TaskletCancelledReport( ((TaskletCancellationRequest)request).getTaskletId()); - master.workerReported(workerManager.getId(), new WorkerReport(Collections.singleton(cancelReport))); + master.workerReported(workerManager.getId(), new WorkerToMasterReports(Collections.singleton(cancelReport))); } return null; } - }).when(vortexRequestor).sendAsync(any(RunningTask.class), any(VortexRequest.class)); + }).when(vortexRequestor).sendAsync(any(RunningTask.class), any(MasterToWorkerRequest.class)); return workerManager; } @@ -93,7 +91,7 @@ public final class TestUtil { */ public Tasklet newTasklet() { final int id = taskletId.getAndIncrement(); - return new Tasklet(id, Optional.empty(), null, null, new VortexFuture(executor, vortexMaster, id, VOID_CODEC)); + return new Tasklet(id, Optional.empty(), null, null, new VortexFuture(executor, vortexMaster, id)); } /** @@ -112,16 +110,6 @@ public final class TestUtil { public Void call(final Void input) throws Exception { return null; } - - @Override - public Codec getInputCodec() { - return VOID_CODEC; - } - - @Override - public Codec getOutputCodec() { - return VOID_CODEC; - } }; } @@ -146,16 +134,6 @@ public final class TestUtil { } } } - - @Override - public Codec getInputCodec() { - return VOID_CODEC; - } - - @Override - public Codec getOutputCodec() { - return VOID_CODEC; - } }; } @@ -168,16 +146,6 @@ public final class TestUtil { public Integer call(final Integer input) throws Exception { return 1; } - - @Override - public Codec<Integer> getInputCodec() { - return INTEGER_CODEC; - } - - @Override - public Codec<Integer> getOutputCodec() { - return INTEGER_CODEC; - } }; } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java index f97d559..3cce93c 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java @@ -20,7 +20,7 @@ package org.apache.reef.tests.applications.vortex; import org.apache.reef.tests.applications.vortex.addone.AddOneTest; import org.apache.reef.tests.applications.vortex.exception.VortexExceptionTest; -import org.apache.reef.tests.applications.vortex.cancellation.TaskletCancellationTest; +import org.apache.reef.tests.applications.vortex.cancellation.TaskletCancellationRequestTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -31,7 +31,7 @@ import org.junit.runners.Suite; @Suite.SuiteClasses({ AddOneTest.class, VortexExceptionTest.class, - TaskletCancellationTest.class + TaskletCancellationRequestTest.class }) public final class VortexTestSuite { } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java index 8db6827..e064976 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java @@ -18,15 +18,12 @@ */ package org.apache.reef.tests.applications.vortex.addone; -import org.apache.reef.io.serialization.Codec; -import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.vortex.api.VortexFunction; /** * Outputs Input+1. */ public final class AddOneFunction implements VortexFunction<Integer, Integer> { - private static final Codec<Integer> CODEC = new SerializableCodec<>(); /** * Outputs Input+1. */ @@ -34,14 +31,4 @@ public final class AddOneFunction implements VortexFunction<Integer, Integer> { public Integer call(final Integer input) throws Exception { return input + 1; } - - @Override - public Codec<Integer> getInputCodec() { - return CODEC; - } - - @Override - public Codec<Integer> getOutputCodec() { - return CODEC; - } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java index b885c82..3225c53 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java @@ -19,9 +19,7 @@ package org.apache.reef.tests.applications.vortex.cancellation; -import org.apache.reef.io.serialization.Codec; import org.apache.reef.vortex.api.VortexFunction; -import org.apache.reef.vortex.util.VoidCodec; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,7 +29,6 @@ import java.util.logging.Logger; */ public final class InfiniteLoopWithCancellationFunction implements VortexFunction<Void, Void> { private static final Logger LOG = Logger.getLogger(InfiniteLoopWithCancellationFunction.class.getName()); - private static final Codec<Void> CODEC = new VoidCodec(); @Override public Void call(final Void input) throws Exception { @@ -43,14 +40,4 @@ public final class InfiniteLoopWithCancellationFunction implements VortexFunctio } } } - - @Override - public Codec<Void> getInputCodec() { - return CODEC; - } - - @Override - public Codec<Void> getOutputCodec() { - return CODEC; - } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationRequestTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationRequestTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationRequestTest.java new file mode 100644 index 0000000..fa572fa --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationRequestTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.tests.applications.vortex.cancellation; + +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tests.TestEnvironment; +import org.apache.reef.tests.TestEnvironmentFactory; +import org.apache.reef.vortex.driver.VortexJobConf; +import org.apache.reef.vortex.driver.VortexMasterConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests the cancellation of a tasklet. + */ +public final class TaskletCancellationRequestTest { + private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment(); + + /** + * Set up the test environment. + */ + @Before + public void setUp() throws Exception { + this.testEnvironment.setUp(); + } + + /** + * Tear down the test environment. + */ + @After + public void tearDown() throws Exception { + this.testEnvironment.tearDown(); + } + + @Test + public void testVortexTaskletCancellation() { + final Configuration vortexMasterConf = VortexMasterConf.CONF + .set(VortexMasterConf.WORKER_NUM, 2) + .set(VortexMasterConf.WORKER_MEM, 64) + .set(VortexMasterConf.WORKER_CORES, 4) + .set(VortexMasterConf.WORKER_CAPACITY, 2000) + .set(VortexMasterConf.VORTEX_START, TaskletCancellationTestStart.class) + .build(); + + final VortexJobConf vortexJobConf = VortexJobConf.newBuilder() + .setJobName("TEST_Vortex_TaskletCancellationTest") + .setVortexMasterConf(vortexMasterConf) + .build(); + + final LauncherStatus status = this.testEnvironment.run(vortexJobConf.getConfiguration()); + Assert.assertTrue("Job state after execution: " + status, status.isSuccess()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java deleted file mode 100644 index 0d86154..0000000 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.tests.applications.vortex.cancellation; - -import org.apache.reef.client.LauncherStatus; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tests.TestEnvironment; -import org.apache.reef.tests.TestEnvironmentFactory; -import org.apache.reef.vortex.driver.VortexJobConf; -import org.apache.reef.vortex.driver.VortexMasterConf; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Tests the cancellation of a tasklet. - */ -public final class TaskletCancellationTest { - private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment(); - - /** - * Set up the test environment. - */ - @Before - public void setUp() throws Exception { - this.testEnvironment.setUp(); - } - - /** - * Tear down the test environment. - */ - @After - public void tearDown() throws Exception { - this.testEnvironment.tearDown(); - } - - @Test - public void testVortexTaskletCancellation() { - final Configuration vortexMasterConf = VortexMasterConf.CONF - .set(VortexMasterConf.WORKER_NUM, 2) - .set(VortexMasterConf.WORKER_MEM, 64) - .set(VortexMasterConf.WORKER_CORES, 4) - .set(VortexMasterConf.WORKER_CAPACITY, 2000) - .set(VortexMasterConf.VORTEX_START, TaskletCancellationTestStart.class) - .build(); - - final VortexJobConf vortexJobConf = VortexJobConf.newBuilder() - .setJobName("TEST_Vortex_TaskletCancellationTest") - .setVortexMasterConf(vortexMasterConf) - .build(); - - final LauncherStatus status = this.testEnvironment.run(vortexJobConf.getConfiguration()); - Assert.assertTrue("Job state after execution: " + status, status.isSuccess()); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java index aeff37b..e78eb75 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java @@ -18,27 +18,14 @@ */ package org.apache.reef.tests.applications.vortex.exception; -import org.apache.reef.io.serialization.Codec; -import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.vortex.api.VortexFunction; /** * A test Vortex function that throws an Exception. */ public final class ExceptionFunction implements VortexFunction<Integer, Integer> { - private static final Codec<Integer> CODEC = new SerializableCodec<>(); @Override public Integer call(final Integer input) throws Exception { throw new RuntimeException("Expected test exception."); } - - @Override - public Codec<Integer> getInputCodec() { - return CODEC; - } - - @Override - public Codec<Integer> getOutputCodec() { - return CODEC; - } } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f277566..d3c5fc4 100644 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,8 @@ under the License. <findbugs.version>3.0.2</findbugs.version> <reflections.version>0.9.9-RC1</reflections.version> <jsr305.version>3.0.1</jsr305.version> + <kryo.version>3.0.3</kryo.version> + <kryo-serializers.version>0.37</kryo-serializers.version> <rootPath>${user.dir}</rootPath> </properties>
