Repository: reef Updated Branches: refs/heads/master 220d75cac -> 55e0cfc85
[REEF-1658] Gracefully shut down all threads at the end of the REEF job This work is part of "REEF as a library" project [REEF-1561](https://issues.apache.org/jira/browse/REEF-1561) Summary of changes: * Close the `EvaluatorIdlenessThreadPool` in `DriverRuntimeStopHandler` * Make `EvaluatorIdlenessThreadPool` `AutoCloseable` to close the local thread pool * Implement `EvaluatorManager.shutdown()` method and call it when removing evaluator from `Evaluators` * Make sure `.close()` methods never throw in `DispatchingEStage` and `EvaluatorMessageDispatcher` * Give better thread names in `EvaluatorIdlenessThreadPool` * Improve logging and error handling in `.close()` methods in all path of the driver shutdown JIRA: [REEF-1658](https://issues.apache.org/jira/browse/REEF-1658) Pull Request: This closes #1175 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/55e0cfc8 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/55e0cfc8 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/55e0cfc8 Branch: refs/heads/master Commit: 55e0cfc856c4f3940bedd6539de67a709e723c00 Parents: 220d75c Author: Sergiy Matusevych <[email protected]> Authored: Mon Oct 31 22:34:15 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Nov 2 12:48:44 2016 -0700 ---------------------------------------------------------------------- .../common/driver/DriverRuntimeStopHandler.java | 20 +++++-- .../evaluator/EvaluatorIdlenessThreadPool.java | 63 ++++++++++++++++++-- .../driver/evaluator/EvaluatorManager.java | 13 +++- .../evaluator/EvaluatorMessageDispatcher.java | 2 +- .../common/driver/evaluator/Evaluators.java | 25 ++++++-- .../runtime/common/utils/DispatchingEStage.java | 4 +- 6 files changed, 106 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java index d344cf3..0919b77 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java @@ -24,6 +24,7 @@ import org.apache.reef.driver.parameters.ResourceManagerPreserveEvaluators; import org.apache.reef.driver.restart.DriverRestartManager; import org.apache.reef.exception.DriverFatalRuntimeException; import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler; +import org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool; import org.apache.reef.runtime.common.driver.evaluator.Evaluators; import org.apache.reef.runtime.common.utils.RemoteManager; import org.apache.reef.tang.annotations.Parameter; @@ -50,6 +51,7 @@ final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> { private final ResourceManagerStopHandler resourceManagerStopHandler; private final RemoteManager remoteManager; private final Evaluators evaluators; + private final EvaluatorIdlenessThreadPool idlenessChecker; private final boolean preserveEvaluatorsAcrossRestarts; @Inject @@ -59,20 +61,22 @@ final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> { final DriverStatusManager driverStatusManager, final ResourceManagerStopHandler resourceManagerStopHandler, final RemoteManager remoteManager, - final Evaluators evaluators) { + final Evaluators evaluators, + final EvaluatorIdlenessThreadPool idlenessChecker) { this.driverRestartManager = driverRestartManager; this.driverStatusManager = driverStatusManager; this.resourceManagerStopHandler = resourceManagerStopHandler; this.remoteManager = remoteManager; this.evaluators = evaluators; + this.idlenessChecker = idlenessChecker; this.preserveEvaluatorsAcrossRestarts = preserveEvaluatorsAcrossRestarts; } @Override public synchronized void onNext(final RuntimeStop runtimeStop) { - LOG.log(Level.FINEST, "RuntimeStop: {0}", runtimeStop); + LOG.log(Level.FINE, "Driver shutdown: start {0}", runtimeStop); final Throwable runtimeException = runtimeStop.getException(); @@ -81,23 +85,29 @@ final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> { if (runtimeException == null || runtimeException instanceof DriverFatalRuntimeException || !this.preserveEvaluatorsAcrossRestarts) { + LOG.log(Level.FINER, "Driver shutdown: close the evaluators"); this.evaluators.close(); } this.resourceManagerStopHandler.onNext(runtimeStop); - // Inform the client of the shutdown. + LOG.log(Level.FINER, "Driver shutdown: notify the client"); this.driverStatusManager.onRuntimeStop(Optional.ofNullable(runtimeException)); - // Close the remoteManager. try { + LOG.log(Level.FINER, "Driver shutdown: close the remote manager"); this.remoteManager.close(); - LOG.log(Level.INFO, "Driver shutdown complete"); } catch (final Exception e) { LOG.log(Level.WARNING, "Error when closing the RemoteManager", e); throw new RuntimeException("Unable to close the RemoteManager.", e); } + LOG.log(Level.FINER, "Driver shutdown: close the restart manager"); this.driverRestartManager.close(); + + LOG.log(Level.FINER, "Driver shutdown: close the idleness checker"); + this.idlenessChecker.close(); + + LOG.log(Level.INFO, "Driver shutdown complete"); } } http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java index e94ab4c..e689504 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java @@ -26,8 +26,10 @@ import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.wake.impl.DefaultThreadFactory; import javax.inject.Inject; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -36,22 +38,25 @@ import java.util.logging.Logger; * of an {@link EvaluatorManager} in order to trigger Evaluator idleness checks. */ @Private -public final class EvaluatorIdlenessThreadPool { +public final class EvaluatorIdlenessThreadPool implements AutoCloseable { + private static final Logger LOG = Logger.getLogger(EvaluatorIdlenessThreadPool.class.getName()); private final ExecutorService executor; private final long waitInMillis; @Inject - private EvaluatorIdlenessThreadPool(@Parameter(EvaluatorIdlenessThreadPoolSize.class) final int numThreads, - @Parameter(EvaluatorIdlenessWaitInMilliseconds.class) final long waitInMillis) { + private EvaluatorIdlenessThreadPool( + @Parameter(EvaluatorIdlenessThreadPoolSize.class) final int numThreads, + @Parameter(EvaluatorIdlenessWaitInMilliseconds.class) final long waitInMillis) { Validate.isTrue(waitInMillis >= 0, "EvaluatorIdlenessWaitInMilliseconds must be configured to be >= 0"); Validate.isTrue(numThreads > 0, "EvaluatorIdlenessThreadPoolSize must be configured to be > 0"); this.waitInMillis = waitInMillis; + this.executor = Executors.newFixedThreadPool( - numThreads, new DefaultThreadFactory(EvaluatorIdlenessThreadPool.class.getName())); + numThreads, new DefaultThreadFactory(this.getClass().getSimpleName())); } /** @@ -60,12 +65,26 @@ public final class EvaluatorIdlenessThreadPool { * @param manager the {@link EvaluatorManager} */ void runCheckAsync(final EvaluatorManager manager) { - executor.submit(new Runnable() { + + final String evaluatorId = manager.getId(); + LOG.log(Level.FINEST, "Idle check for Evaluator: {0}", manager); + + this.executor.submit(new Runnable() { + @Override public void run() { + + LOG.log(Level.FINEST, "Idle check for Evaluator {0} - begin", evaluatorId); + while (!manager.isClosed()) { try { + + LOG.log(Level.FINEST, + "Waiting for Evaluator {0} to close: Sleep for {1} ms", + new Object[] {evaluatorId, waitInMillis}); + Thread.sleep(waitInMillis); + } catch (final InterruptedException e) { LOG.log(Level.SEVERE, "Thread interrupted while waiting for Evaluator to finish."); throw new RuntimeException(e); @@ -73,8 +92,40 @@ public final class EvaluatorIdlenessThreadPool { } manager.checkIdlenessSource(); - LOG.log(Level.FINE, "Evaluator " + manager.getId() + " has finished."); + + LOG.log(Level.FINEST, "Idle check for Evaluator {0} - end", evaluatorId); + } + + @Override + public String toString() { + return "CheckIdle: " + evaluatorId; } }); } + + /** + * Shutdown the thread pool of idleness checkers. + */ + @Override + public void close() { + + LOG.log(Level.FINE, "EvaluatorIdlenessThreadPool shutdown: begin"); + + this.executor.shutdown(); + + boolean isTerminated = false; + try { + isTerminated = this.executor.awaitTermination(this.waitInMillis, TimeUnit.MILLISECONDS); + } catch (final InterruptedException ex) { + LOG.log(Level.WARNING, "EvaluatorIdlenessThreadPool shutdown: Interrupted", ex); + } + + if (isTerminated) { + LOG.log(Level.FINE, "EvaluatorIdlenessThreadPool shutdown: Terminated successfully"); + } else { + final List<Runnable> pendingJobs = this.executor.shutdownNow(); + LOG.log(Level.SEVERE, "EvaluatorIdlenessThreadPool shutdown: {0} jobs after timeout", pendingJobs.size()); + LOG.log(Level.FINE, "EvaluatorIdlenessThreadPool shutdown: pending jobs: {0}", pendingJobs); + } + } } http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index fc77380..26af25f 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -219,6 +219,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { @Override public void close() { + LOG.log(Level.FINER, "Close EvaluatorManager {0} - begin", this.evaluatorId); + synchronized (this.evaluatorDescriptor) { if (this.stateManager.isAvailable()) { @@ -275,6 +277,15 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { } this.idlenessThreadPool.runCheckAsync(this); + + LOG.log(Level.FINER, "Close EvaluatorManager {0} - end", this.evaluatorId); + } + + /** + * Close message dispatcher for the evaluator. + */ + public void shutdown() { + this.messageDispatcher.close(); } /** @@ -357,7 +368,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e); } finally { this.stateManager.setFailed(); - close(); + this.close(); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java index 18e868a..ce879da 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java @@ -286,7 +286,7 @@ public final class EvaluatorMessageDispatcher implements AutoCloseable { } @Override - public void close() throws Exception { + public void close() { LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier); // This effectively closes all dispatchers as they share the same stage. this.serviceDispatcher.close(); http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java index 68a122d..2e6974b 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java @@ -65,16 +65,22 @@ public final class Evaluators implements AutoCloseable { */ @Override public void close() { + + LOG.log(Level.FINER, "Closing the evaluators - begin"); + final List<EvaluatorManager> evaluatorsCopy; synchronized (this) { evaluatorsCopy = new ArrayList<>(this.evaluators.values()); } + for (final EvaluatorManager evaluatorManager : evaluatorsCopy) { if (!evaluatorManager.isClosedOrClosing()) { LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId()); evaluatorManager.close(); } } + + LOG.log(Level.FINER, "Closing the evaluators - end"); } /** @@ -148,19 +154,28 @@ public final class Evaluators implements AutoCloseable { * Moves evaluator from map of active evaluators to set of closed evaluators. */ public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) { + final String evaluatorId = evaluatorManager.getId(); + if (!evaluatorManager.isClosed()) { throw new IllegalArgumentException("Trying to remove evaluator " + evaluatorId + " which is not closed yet."); } + if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) { throw new IllegalArgumentException("Trying to remove unknown evaluator " + evaluatorId + "."); } + if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) { - LOG.log(Level.FINE, "Trying to remove closed evaluator " + evaluatorId + " which has already been removed."); - } else { - LOG.log(Level.FINE, "Removing closed evaluator " + evaluatorId + "."); - this.evaluators.remove(evaluatorId); - this.closedEvaluatorIds.add(evaluatorId); + LOG.log(Level.FINE, "Trying to remove closed evaluator {0} which has already been removed.", evaluatorId); + return; } + + LOG.log(Level.FINE, "Removing closed evaluator {0}", evaluatorId); + + evaluatorManager.shutdown(); + this.evaluators.remove(evaluatorId); + this.closedEvaluatorIds.add(evaluatorId); + + LOG.log(Level.FINEST, "Evaluator {0} removed", evaluatorId); } } http://git-wip-us.apache.org/repos/asf/reef/blob/55e0cfc8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java index 03c1463..3a65df9 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java @@ -119,11 +119,9 @@ public final class DispatchingEStage implements AutoCloseable { /** * Close the internal thread pool. - * - * @throws Exception forwarded from EStage.close() call. */ @Override - public void close() throws Exception { + public void close() { this.stage.close(); }
