Repository: reef Updated Branches: refs/heads/master c67391a1d -> ac6785db4
REEF-1250] Fix memory leak in Evaluators This change: * introduces a set of closed evaluator ids, * moves evaluator to this set whenever resource manager sends resource status indicating that evaluator is closed. JIRA: [REEF-1250](https://issues.apache.org/jira/browse/REEF-1250) Pull request: This closes #995 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ac6785db Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ac6785db Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ac6785db Branch: refs/heads/master Commit: ac6785db44d9a1aecb819eba96558efdaae4e570 Parents: c67391a Author: Mariia Mykhailova <[email protected]> Authored: Mon May 9 15:35:24 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed May 11 10:21:54 2016 -0700 ---------------------------------------------------------------------- .../evaluator/EvaluatorHeartbeatHandler.java | 5 +++ .../EvaluatorResourceManagerErrorHandler.java | 6 ++- .../common/driver/evaluator/Evaluators.java | 44 +++++++++++++++++--- .../resourcemanager/ResourceStatusHandler.java | 16 ++++++- 4 files changed, 63 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java index 9011d7f..aedbc5a 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java @@ -71,6 +71,11 @@ public final class EvaluatorHeartbeatHandler return; } + if (this.evaluators.wasClosed(evaluatorId)) { + LOG.log(Level.WARNING, "Evaluator [" + evaluatorId + "] has reported back to the driver after it was closed."); + return; + } + if (driverRestartManager.isRestarting() && driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPECTED) { http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java index a55fec3..3a188e0 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java @@ -62,7 +62,11 @@ public final class EvaluatorResourceManagerErrorHandler if (evaluatorManager.isPresent()) { evaluatorManager.get().onEvaluatorException(evaluatorException); } else { - LOG.log(Level.WARNING, "Unknown evaluator runtime error: " + error); + if (this.evaluators.wasClosed(evaluatorId)) { + LOG.log(Level.WARNING, "Evaluator [" + evaluatorId + "] has raised exception after it was closed."); + } else { + LOG.log(Level.WARNING, "Unknown evaluator runtime error: " + error); + } } } } http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java index 59ee6a8..5bc0645 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java @@ -23,12 +23,10 @@ import org.apache.reef.annotations.audience.Private; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; import org.apache.reef.util.Optional; import org.apache.reef.util.SingletonAsserter; +import org.apache.reef.tang.util.MonotonicSet; import javax.inject.Inject; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -47,6 +45,10 @@ public final class Evaluators implements AutoCloseable { */ private final Map<String, EvaluatorManager> evaluators = new HashMap<>(); + /** + * A set of evaluatorIds for "closed" (failed and returned) evaluators. + */ + private final MonotonicSet<String> closedEvaluatorIds = new MonotonicSet<>(); @Inject Evaluators() { @@ -95,6 +97,14 @@ public final class Evaluators implements AutoCloseable { } /** + * @param evaluatorId + * @return true if evaluator with this id has already been closed. + */ + public synchronized boolean wasClosed(final String evaluatorId) { + return this.closedEvaluatorIds.contains(evaluatorId); + } + + /** * Create new EvaluatorManager and add it to the collection. * <p> * FIXME: This method is a temporary fix for the race condition @@ -118,11 +128,35 @@ public final class Evaluators implements AutoCloseable { */ public synchronized void put(final EvaluatorManager evaluatorManager) { final String evaluatorId = evaluatorManager.getId(); + if (this.wasClosed(evaluatorId)) { + throw new IllegalArgumentException( + "Trying to re-add an Evaluator that has already been closed: " + evaluatorId); + } final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager); LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev}); if (prev != null) { throw new IllegalArgumentException( - "Trying to re-add an Evaluator that is already known: " + evaluatorId); + "Trying to re-add an Evaluator that is already known: " + evaluatorId); + } + } + + /** + * Moves evaluator from map of active evaluators to set of closed evaluators. + */ + public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) { + final String evaluatorId = evaluatorManager.getId(); + if (!evaluatorManager.isClosed()) { + throw new IllegalArgumentException("Trying to remove evaluator " + evaluatorId + " which is not closed yet."); + } + if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) { + throw new IllegalArgumentException("Trying to remove unknown evaluator " + evaluatorId + "."); + } + if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) { + LOG.log(Level.FINE, "Trying to remove closed evaluator " + evaluatorId + " which has already been removed."); + } else { + LOG.log(Level.FINE, "Removing closed evaluator " + evaluatorId + "."); + this.evaluators.remove(evaluatorId); + this.closedEvaluatorIds.add(evaluatorId); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/ac6785db/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java index 1a2cb38..50ee841 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java @@ -28,6 +28,8 @@ import org.apache.reef.util.Optional; import org.apache.reef.wake.EventHandler; import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; /** * A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks @@ -35,6 +37,7 @@ import javax.inject.Inject; */ @Private public final class ResourceStatusHandler implements EventHandler<ResourceStatusEvent> { + private static final Logger LOG = Logger.getLogger(Evaluators.class.getName()); private final Evaluators evaluators; private final EvaluatorManagerFactory evaluatorManagerFactory; @@ -50,8 +53,8 @@ public final class ResourceStatusHandler implements EventHandler<ResourceStatusE } /** - * This resource status message comes from the ResourceManager layer; telling me what it thinks. - * about the state of the resource executing an Evaluator; This method simply passes the message + * This resource status message comes from the ResourceManager layer, telling me what it thinks + * about the state of the resource executing an Evaluator. This method simply passes the message * off to the referenced EvaluatorManager * * @param resourceStatusEvent resource status message from the ResourceManager @@ -61,7 +64,16 @@ public final class ResourceStatusHandler implements EventHandler<ResourceStatusE final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusEvent.getIdentifier()); if (evaluatorManager.isPresent()) { evaluatorManager.get().onResourceStatusMessage(resourceStatusEvent); + + if (evaluatorManager.get().isClosed()) { + this.evaluators.removeClosedEvaluator(evaluatorManager.get()); + } } else { + if (this.evaluators.wasClosed(resourceStatusEvent.getIdentifier())) { + LOG.log(Level.WARNING, "Unexpected resource status from closed evaluator " + + resourceStatusEvent.getIdentifier() + " with state " + resourceStatusEvent.getState()); + } + if (driverRestartManager.get().getEvaluatorRestartState(resourceStatusEvent.getIdentifier()) .isFailedOrExpired()) { final EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory
