Repository: reef Updated Branches: refs/heads/master 4c0207ed0 -> bf5caab94
[REEF-1396] Fix testFailureRestart to validate that the restarted Evaluators are received This addressed the issue by * Adding a count for closed Evaluators and verifying it at the end of the tests. * Adding a single Evaluator failure and single Evaluator restart case. * Lower number of test Evaluators. JIRA: [REEF-1396](https://issues.apache.org/jira/browse/REEF-1396) Pull Request This closes #1013 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bf5caab9 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bf5caab9 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bf5caab9 Branch: refs/heads/master Commit: bf5caab94bf4614658b3e4071034c3ecae916660 Parents: 4c0207e Author: Andrew Chung <[email protected]> Authored: Tue May 24 11:29:13 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue May 31 09:02:53 2016 -0700 ---------------------------------------------------------------------- .../tests/evaluator/failure/FailureDriver.java | 147 +++++++++++++++++++ .../tests/evaluator/failure/FailureREEF.java | 142 ++++++++++++++++++ .../tests/evaluator/failure/package-info.java | 22 +++ .../failure/parameters/NumEvaluatorsToFail.java | 32 ++++ .../parameters/NumEvaluatorsToSubmit.java | 32 ++++ .../failure/parameters/package-info.java | 22 +++ .../reef/tests/yarn/failure/FailureDriver.java | 112 -------------- .../reef/tests/yarn/failure/FailureREEF.java | 134 ----------------- .../reef/tests/yarn/failure/package-info.java | 22 --- .../java/org/apache/reef/tests/FailureTest.java | 18 ++- 10 files changed, 412 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java new file mode 100644 index 0000000..65b4913 --- /dev/null +++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.evaluator.failure; + +import org.apache.commons.lang3.Validate; +import org.apache.reef.driver.context.ContextConfiguration; +import org.apache.reef.driver.evaluator.*; +import org.apache.reef.poison.PoisonedConfiguration; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToFail; +import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToSubmit; +import org.apache.reef.tests.library.exceptions.DriverSideFailure; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Driver for failure test. + */ +@Unit +public class FailureDriver { + + private final int numEvaluatorsToSubmit; + private final int numEvaluatorsToFail; + private final AtomicInteger numEvaluatorsLeftToSubmit; + private final AtomicInteger numEvaluatorsLeftToClose; + private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName()); + private final EvaluatorRequestor requestor; + + @Inject + public FailureDriver(@Parameter(NumEvaluatorsToSubmit.class) final int numEvaluatorsToSubmit, + @Parameter(NumEvaluatorsToFail.class) final int numEvaluatorsToFail, + final EvaluatorRequestor requestor) { + Validate.isTrue(numEvaluatorsToSubmit > 0, "The number of Evaluators to submit must be greater than 0."); + Validate.inclusiveBetween(1, numEvaluatorsToSubmit, numEvaluatorsToFail, + "The number of Evaluators to fail must be between 1 and numEvaluatorsToSubmit, inclusive."); + + this.numEvaluatorsToSubmit = numEvaluatorsToSubmit; + this.numEvaluatorsToFail = numEvaluatorsToFail; + + this.numEvaluatorsLeftToSubmit = new AtomicInteger(numEvaluatorsToSubmit); + + // We should close numEvaluatorsToSubmit because all failed Evaluators are eventually resubmitted and closed. + this.numEvaluatorsLeftToClose = new AtomicInteger(numEvaluatorsToSubmit); + + this.requestor = requestor; + LOG.info("Driver instantiated"); + } + + /** + * Handles the StartTime event: Request as single Evaluator. + */ + final class StartHandler implements EventHandler<StartTime> { + @Override + public void onNext(final StartTime startTime) { + LOG.log(Level.FINE, "Request {0} Evaluators.", numEvaluatorsToSubmit); + FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder() + .setNumber(numEvaluatorsToSubmit) + .setMemory(64) + .setNumberOfCores(1) + .build()); + } + } + + /** + * Handles AllocatedEvaluator: Submit a poisoned context. + */ + final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { + @Override + public void onNext(final AllocatedEvaluator allocatedEvaluator) { + final String evalId = allocatedEvaluator.getId(); + LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId); + if (numEvaluatorsLeftToSubmit.getAndDecrement() > 0) { + LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", numEvaluatorsLeftToSubmit); + allocatedEvaluator.submitContext( + Tang.Factory.getTang() + .newConfigurationBuilder( + ContextConfiguration.CONF + .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId) + .build(), + PoisonedConfiguration.CONTEXT_CONF + .set(PoisonedConfiguration.CRASH_PROBABILITY, "1") + .set(PoisonedConfiguration.CRASH_TIMEOUT, "1") + .build()) + .build()); + } else { + LOG.log(Level.FINE, "Closing evaluator {0}", evalId); + allocatedEvaluator.close(); + FailureDriver.this.numEvaluatorsLeftToClose.decrementAndGet(); + } + } + } + + /** + * Handles FailedEvaluator: Resubmits the single Evaluator resource request. + */ + final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> { + @Override + public void onNext(final FailedEvaluator failedEvaluator) { + LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId()); + FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder() + .setNumber(1) + .setMemory(64) + .setNumberOfCores(1) + .build()); + } + } + + /** + * Checks whether all failed Evaluators were properly resubmitted and restarted. + */ + final class StopHandler implements EventHandler<StopTime> { + @Override + public void onNext(final StopTime stopTime) { + final int numEvaluatorsToClose = FailureDriver.this.numEvaluatorsLeftToClose.get(); + if (numEvaluatorsToClose != 0){ + final String message = "Got RuntimeStop Event. Expected to close " + numEvaluatorsToSubmit + " Evaluators " + + "but only " + (numEvaluatorsToSubmit - numEvaluatorsToClose) + " Evaluators were closed."; + LOG.log(Level.SEVERE, message); + throw new DriverSideFailure(message); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java new file mode 100644 index 0000000..c9828b4 --- /dev/null +++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.evaluator.failure; + +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; +import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; +import org.apache.reef.tang.*; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.CommandLine; +import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToFail; +import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToSubmit; +import org.apache.reef.util.EnvironmentUtils; + +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Entry point class for REEF failure test. + */ +public final class FailureREEF { + /** + * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently. + */ + public static final int MAX_NUMBER_OF_EVALUATORS = 16; + + private static final Logger LOG = Logger.getLogger(FailureREEF.class.getName()); + + private static Configuration parseCommandLine(final String[] aArgs) { + final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); + try { + new CommandLine(cb) + .registerShortNameOfClass(Local.class) + .registerShortNameOfClass(TimeOut.class) + .processCommandLine(aArgs); + return cb.build(); + } catch (final BindException | IOException ex) { + final String msg = "Unable to parse command line"; + LOG.log(Level.SEVERE, msg, ex); + throw new RuntimeException(msg, ex); + } + } + + /** + * @return (immutable) TANG Configuration object. + * @throws BindException if configuration injector fails. + * @throws InjectionException if the Local.class parameter is not injected. + */ + private static Configuration getRunTimeConfiguration(final boolean isLocal) throws BindException { + + final Configuration runtimeConfiguration; + + if (isLocal) { + LOG.log(Level.INFO, "Running Failure demo on the local runtime"); + runtimeConfiguration = LocalRuntimeConfiguration.CONF + .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) + .build(); + } else { + LOG.log(Level.INFO, "Running Failure demo on YARN"); + runtimeConfiguration = YarnClientConfiguration.CONF.build(); + } + + return runtimeConfiguration; + } + + public static LauncherStatus runFailureReef( + final Configuration runtimeConfig, final int timeout, final int numEvaluatorsToSubmit, + final int numEvaluatorsToFail) throws InjectionException { + + final Configuration driverConf = DriverConfiguration.CONF + .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailureDriver.class)) + .set(DriverConfiguration.DRIVER_IDENTIFIER, "FailureREEF") + .set(DriverConfiguration.ON_DRIVER_STARTED, FailureDriver.StartHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, FailureDriver.EvaluatorAllocatedHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_FAILED, FailureDriver.EvaluatorFailedHandler.class) + .set(DriverConfiguration.ON_DRIVER_STOP, FailureDriver.StopHandler.class) + .build(); + + final Configuration namedParamsConf = Tang.Factory.getTang().newConfigurationBuilder() + .bindNamedParameter(NumEvaluatorsToSubmit.class, Integer.toString(numEvaluatorsToSubmit)) + .bindNamedParameter(NumEvaluatorsToFail.class, Integer.toString(numEvaluatorsToFail)) + .build(); + + final LauncherStatus state = DriverLauncher.getLauncher(runtimeConfig) + .run(Configurations.merge(driverConf, namedParamsConf), timeout); + + LOG.log(Level.INFO, "REEF job completed: {0}", state); + return state; + } + + public static void main(final String[] args) throws InjectionException { + final Configuration commandLineConf = parseCommandLine(args); + final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); + final boolean isLocal = injector.getNamedInstance(Local.class); + final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000; + runFailureReef(getRunTimeConfiguration(isLocal), jobTimeout, 40, 10); + } + + /** + * Empty private constructor to prohibit instantiation of utility class. + */ + private FailureREEF() { + } + + /** + * Command line parameter = true to run locally, or false to run on YARN. + */ + @NamedParameter(doc = "Whether or not to run on the local runtime", + short_name = "local", default_value = "true") + public static final class Local implements Name<Boolean> { + } + + /** + * Number of minutes before timeout. + */ + @NamedParameter(doc = "Number of minutes before timeout", + short_name = "timeout", default_value = "2") + public static final class TimeOut implements Name<Integer> { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java new file mode 100644 index 0000000..ca60a12 --- /dev/null +++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Tests for Evaluator failures. + */ +package org.apache.reef.tests.evaluator.failure; http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java new file mode 100644 index 0000000..c5123ad --- /dev/null +++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.evaluator.failure.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tests.evaluator.failure.FailureDriver; + +/** + * The number of Evaluators to fail and resubmit in {@link FailureDriver}. + */ +@NamedParameter(doc = "The number of Evaluators to fail and resubmit in FailureDriver.") +public final class NumEvaluatorsToFail implements Name<Integer> { + private NumEvaluatorsToFail() { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java new file mode 100644 index 0000000..7572f49 --- /dev/null +++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.evaluator.failure.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tests.evaluator.failure.FailureDriver; + +/** + * The number of Evaluators to submit in {@link FailureDriver}. + */ +@NamedParameter(doc = "The number of Evaluators to submit in FailureDriver.") +public final class NumEvaluatorsToSubmit implements Name<Integer> { + private NumEvaluatorsToSubmit() { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java new file mode 100644 index 0000000..72a5101 --- /dev/null +++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Parameters for {@link org.apache.reef.tests.evaluator.failure.FailureDriver}. + */ +package org.apache.reef.tests.evaluator.failure.parameters; http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java deleted file mode 100644 index 037b2c2..0000000 --- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.tests.yarn.failure; - -import org.apache.reef.driver.context.ContextConfiguration; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.EvaluatorRequest; -import org.apache.reef.driver.evaluator.EvaluatorRequestor; -import org.apache.reef.driver.evaluator.FailedEvaluator; -import org.apache.reef.poison.PoisonedConfiguration; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.annotations.Unit; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.time.event.StartTime; - -import javax.inject.Inject; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Driver for failure test. - */ -@Unit -public class FailureDriver { - - private static final int NUM_EVALUATORS = 40; - private static final int NUM_FAILURES = 10; - private final AtomicInteger toSubmit = new AtomicInteger(NUM_FAILURES); - private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName()); - private final EvaluatorRequestor requestor; - - @Inject - public FailureDriver(final EvaluatorRequestor requestor) { - this.requestor = requestor; - LOG.info("Driver instantiated"); - } - - /** - * Handles the StartTime event: Request as single Evaluator. - */ - final class StartHandler implements EventHandler<StartTime> { - @Override - public void onNext(final StartTime startTime) { - LOG.log(Level.FINE, "Request {0} Evaluators.", NUM_EVALUATORS); - FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder() - .setNumber(NUM_EVALUATORS) - .setMemory(64) - .setNumberOfCores(1) - .build()); - } - } - - /** - * Handles AllocatedEvaluator: Submit a poisoned context. - */ - final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { - @Override - public void onNext(final AllocatedEvaluator allocatedEvaluator) { - final String evalId = allocatedEvaluator.getId(); - LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId); - if (toSubmit.getAndDecrement() > 0) { - LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", toSubmit); - allocatedEvaluator.submitContext( - Tang.Factory.getTang() - .newConfigurationBuilder( - ContextConfiguration.CONF - .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId) - .build(), - PoisonedConfiguration.CONTEXT_CONF - .set(PoisonedConfiguration.CRASH_PROBABILITY, "1") - .set(PoisonedConfiguration.CRASH_TIMEOUT, "1") - .build()) - .build()); - } else { - LOG.log(Level.FINE, "Closing evaluator {0}", evalId); - allocatedEvaluator.close(); - } - } - } - - /** - * Handles FailedEvaluator: Resubmits the single Evaluator resource request. - */ - final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> { - @Override - public void onNext(final FailedEvaluator failedEvaluator) { - LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId()); - FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder() - .setNumber(1) - .setMemory(64) - .setNumberOfCores(1) - .build()); - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java deleted file mode 100644 index d056dad..0000000 --- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.tests.yarn.failure; - -import org.apache.reef.client.DriverConfiguration; -import org.apache.reef.client.DriverLauncher; -import org.apache.reef.client.LauncherStatus; -import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; -import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.JavaConfigurationBuilder; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.annotations.Name; -import org.apache.reef.tang.annotations.NamedParameter; -import org.apache.reef.tang.exceptions.BindException; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tang.formats.CommandLine; -import org.apache.reef.util.EnvironmentUtils; - -import java.io.IOException; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Entry point class for REEF failure test. - */ -public final class FailureREEF { - /** - * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently. - */ - public static final int MAX_NUMBER_OF_EVALUATORS = 16; - - private static final Logger LOG = Logger.getLogger(FailureREEF.class.getName()); - - private static Configuration parseCommandLine(final String[] aArgs) { - final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); - try { - new CommandLine(cb) - .registerShortNameOfClass(Local.class) - .registerShortNameOfClass(TimeOut.class) - .processCommandLine(aArgs); - return cb.build(); - } catch (final BindException | IOException ex) { - final String msg = "Unable to parse command line"; - LOG.log(Level.SEVERE, msg, ex); - throw new RuntimeException(msg, ex); - } - } - - /** - * @return (immutable) TANG Configuration object. - * @throws BindException if configuration injector fails. - * @throws InjectionException if the Local.class parameter is not injected. - */ - private static Configuration getRunTimeConfiguration(final boolean isLocal) throws BindException { - - final Configuration runtimeConfiguration; - - if (isLocal) { - LOG.log(Level.INFO, "Running Failure demo on the local runtime"); - runtimeConfiguration = LocalRuntimeConfiguration.CONF - .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) - .build(); - } else { - LOG.log(Level.INFO, "Running Failure demo on YARN"); - runtimeConfiguration = YarnClientConfiguration.CONF.build(); - } - - return runtimeConfiguration; - } - - public static LauncherStatus runFailureReef( - final Configuration runtimeConfig, final int timeout) throws InjectionException { - - final Configuration driverConf = DriverConfiguration.CONF - .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailureDriver.class)) - .set(DriverConfiguration.DRIVER_IDENTIFIER, "FailureREEF") - .set(DriverConfiguration.ON_DRIVER_STARTED, FailureDriver.StartHandler.class) - .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, FailureDriver.EvaluatorAllocatedHandler.class) - .set(DriverConfiguration.ON_EVALUATOR_FAILED, FailureDriver.EvaluatorFailedHandler.class) - .build(); - - final LauncherStatus state = DriverLauncher.getLauncher(runtimeConfig).run(driverConf, timeout); - LOG.log(Level.INFO, "REEF job completed: {0}", state); - return state; - } - - public static void main(final String[] args) throws InjectionException { - final Configuration commandLineConf = parseCommandLine(args); - final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf); - final boolean isLocal = injector.getNamedInstance(Local.class); - final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000; - runFailureReef(getRunTimeConfiguration(isLocal), jobTimeout); - } - - /** - * Empty private constructor to prohibit instantiation of utility class. - */ - private FailureREEF() { - } - - /** - * Command line parameter = true to run locally, or false to run on YARN. - */ - @NamedParameter(doc = "Whether or not to run on the local runtime", - short_name = "local", default_value = "true") - public static final class Local implements Name<Boolean> { - } - - /** - * Number of minutes before timeout. - */ - @NamedParameter(doc = "Number of minutes before timeout", - short_name = "timeout", default_value = "2") - public static final class TimeOut implements Name<Integer> { - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java deleted file mode 100644 index 816f866..0000000 --- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Tests for YARN failures. - */ -package org.apache.reef.tests.yarn.failure; http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java index a25a1bf..a2962c4 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java @@ -21,7 +21,7 @@ package org.apache.reef.tests; import org.apache.reef.client.LauncherStatus; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tests.yarn.failure.FailureREEF; +import org.apache.reef.tests.evaluator.failure.FailureREEF; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -45,12 +45,24 @@ public class FailureTest { } @Test + public void testSingleEvaluatorFailureAndRestart() throws InjectionException { + runTestFailureReefWithParams(1, 1, "testSingleEvaluatorFailureAndRestart"); + } + + @Test public void testFailureRestart() throws InjectionException { + runTestFailureReefWithParams(30, 5, "testFailureRestart"); + } + + private void runTestFailureReefWithParams(final int numEvaluatorsToSubmit, + final int numEvaluatorsTofail, + final String testName) throws InjectionException { final Configuration runtimeConfiguration = this.testEnvironment.getRuntimeConfiguration(); final LauncherStatus status = - FailureREEF.runFailureReef(runtimeConfiguration, this.testEnvironment.getTestTimeout()); + FailureREEF.runFailureReef(runtimeConfiguration, this.testEnvironment.getTestTimeout(), + numEvaluatorsToSubmit, numEvaluatorsTofail); - Assert.assertTrue("FailureReef failed: " + status, status.isSuccess()); + Assert.assertTrue("FailureReef " + testName + " failed: " + status, status.isSuccess()); } }
