Repository: reef
Updated Branches:
refs/heads/master 67fe83587 -> d25412e9c
[REEF-1555] Refactor REEF Driver logic for code readability and add more
javadocs
Summary of changes:
* Cosmetic changes + extra javadocs and comments in DriverRuntimeStartHandler
* add javadocs to HandlerContainer class description
* refactor DriverStatusManager for readability.
* Add javadocs and deprecate the sendJobEndingMessageToClient() method. No
changes in the logic.
* cosmetic changes in DriverRuntimeStopHandler for readability;
* add public method DriverStatusManager.onRuntimeStop() to use in favor of
deprecated sendJobEndingMessageToClient()
* cosmetic changes for readability + javadocs in ResourceManagerStatus class
* cosmetic changes for code readability in YarnContainerManager
* add javadocs to YarnContainerManager
* refactor local ContainerManager for readability + use more idiomatic java.
No changes in functionality
* more cosmetic changes to ContainerManager
* cosmetic cleanups in RuntimesHost code. No changes in logic
JIRA:
[REEF-1555](https://issues.apache.org/jira/browse/REEF-1555)
Pull Request:
This closes #1111
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d25412e9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d25412e9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d25412e9
Branch: refs/heads/master
Commit: d25412e9cb17de688c0db9809dcc1ad314d8337d
Parents: 67fe835
Author: Sergiy Matusevych <[email protected]>
Authored: Tue Aug 30 19:10:28 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Sep 7 18:31:00 2016 -0700
----------------------------------------------------------------------
.../driver/DriverRuntimeStartHandler.java | 47 ++-
.../common/driver/DriverRuntimeStopHandler.java | 27 +-
.../common/driver/DriverStatusManager.java | 161 ++++++----
.../common/driver/idle/DriverIdleManager.java | 63 ++--
.../resourcemanager/ResourceManagerStatus.java | 106 +++----
.../runtime/local/driver/ContainerManager.java | 188 +++++------
.../reef/runtime/multi/driver/RuntimesHost.java | 129 ++++----
.../yarn/driver/YarnContainerManager.java | 314 ++++++++++++-------
8 files changed, 587 insertions(+), 448 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
index 612283f..a3ef519 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
@@ -35,11 +35,14 @@ import java.util.logging.Logger;
/**
* The RuntimeStart handler of the Driver.
* <p>
- * This instantiates the DriverSingletons upon construction. Upon onNext(), it
sets the resource manager status and
- * wires up the remote event handler connections to the client and the
evaluators.
+ * This instantiates the DriverSingletons upon construction. Upon onNext(),
+ * it sets the resource manager status and wires up the remote event handler
+ * connections to the client and the evaluators.
*/
final class DriverRuntimeStartHandler implements EventHandler<RuntimeStart> {
+
private static final Logger LOG =
Logger.getLogger(DriverRuntimeStartHandler.class.getName());
+
private final RemoteManager remoteManager;
private final EvaluatorResourceManagerErrorHandler
evaluatorResourceManagerErrorHandler;
private final EvaluatorHeartbeatHandler evaluatorHeartbeatHandler;
@@ -57,13 +60,15 @@ final class DriverRuntimeStartHandler implements
EventHandler<RuntimeStart> {
* @param driverStatusManager will be set to RUNNING in
onNext()
*/
@Inject
- DriverRuntimeStartHandler(final DriverSingletons singletons,
- final RemoteManager remoteManager,
- final EvaluatorResourceManagerErrorHandler
evaluatorResourceManagerErrorHandler,
- final EvaluatorHeartbeatHandler
evaluatorHeartbeatHandler,
- final ResourceManagerStatus resourceManagerStatus,
- final ResourceManagerStartHandler
resourceManagerStartHandler,
- final DriverStatusManager driverStatusManager) {
+ private DriverRuntimeStartHandler(
+ final DriverSingletons singletons,
+ final RemoteManager remoteManager,
+ final EvaluatorResourceManagerErrorHandler
evaluatorResourceManagerErrorHandler,
+ final EvaluatorHeartbeatHandler evaluatorHeartbeatHandler,
+ final ResourceManagerStatus resourceManagerStatus,
+ final ResourceManagerStartHandler resourceManagerStartHandler,
+ final DriverStatusManager driverStatusManager) {
+
this.remoteManager = remoteManager;
this.evaluatorResourceManagerErrorHandler =
evaluatorResourceManagerErrorHandler;
this.evaluatorHeartbeatHandler = evaluatorHeartbeatHandler;
@@ -72,16 +77,34 @@ final class DriverRuntimeStartHandler implements
EventHandler<RuntimeStart> {
this.driverStatusManager = driverStatusManager;
}
+ /**
+ * This method is called on start of the REEF Driver runtime event loop.
+ * It contains startup logic for REEF Driver that is independent from a
+ * runtime framework (e.g. Mesos, YARN, Local, etc).
+ * Platform-specific logic is then handled in ResourceManagerStartHandler.
+ * @param runtimeStart An event that signals start of the Driver runtime.
+ * Contains a timestamp and can be pretty printed.
+ */
@Override
public synchronized void onNext(final RuntimeStart runtimeStart) {
+
LOG.log(Level.FINEST, "RuntimeStart: {0}", runtimeStart);
-
this.remoteManager.registerHandler(EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.class,
- evaluatorHeartbeatHandler);
-
this.remoteManager.registerHandler(ReefServiceProtos.RuntimeErrorProto.class,
evaluatorResourceManagerErrorHandler);
+ // Register for heartbeats and error messages from the Evaluators.
+ this.remoteManager.registerHandler(
+ EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.class,
+ this.evaluatorHeartbeatHandler);
+
+ this.remoteManager.registerHandler(
+ ReefServiceProtos.RuntimeErrorProto.class,
+ this.evaluatorResourceManagerErrorHandler);
+
this.resourceManagerStatus.setRunning();
this.driverStatusManager.onRunning();
+
+ // Forward start event to the runtime-specific handler (e.g. YARN, Local,
etc.)
this.resourceManagerStartHandler.onNext(runtimeStart);
+
LOG.log(Level.FINEST, "DriverRuntimeStartHandler complete.");
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
index e57ba0a..678a596 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
@@ -41,6 +41,7 @@ import java.util.logging.Logger;
@Private
@DriverSide
final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
+
private static final Logger LOG =
Logger.getLogger(DriverRuntimeStopHandler.class.getName());
private final DriverStatusManager driverStatusManager;
@@ -50,12 +51,13 @@ final class DriverRuntimeStopHandler implements
EventHandler<RuntimeStop> {
private final boolean preserveEvaluatorsAcrossRestarts;
@Inject
- DriverRuntimeStopHandler(final DriverStatusManager driverStatusManager,
- final ResourceManagerStopHandler
resourceManagerStopHandler,
- final RemoteManager remoteManager,
- final Evaluators evaluators,
- @Parameter(ResourceManagerPreserveEvaluators.class)
- final boolean preserveEvaluatorsAcrossRestarts) {
+ DriverRuntimeStopHandler(
+ final DriverStatusManager driverStatusManager,
+ final ResourceManagerStopHandler resourceManagerStopHandler,
+ final RemoteManager remoteManager,
+ final Evaluators evaluators,
+ @Parameter(ResourceManagerPreserveEvaluators.class) final boolean
preserveEvaluatorsAcrossRestarts) {
+
this.driverStatusManager = driverStatusManager;
this.resourceManagerStopHandler = resourceManagerStopHandler;
this.remoteManager = remoteManager;
@@ -65,25 +67,30 @@ final class DriverRuntimeStopHandler implements
EventHandler<RuntimeStop> {
@Override
public synchronized void onNext(final RuntimeStop runtimeStop) {
+
LOG.log(Level.FINEST, "RuntimeStop: {0}", runtimeStop);
+ final Throwable runtimeException = runtimeStop.getException();
+
// Shut down evaluators if there are no exceptions, the driver is
forcefully
// shut down by a non-recoverable exception, or restart is not enabled.
- if (runtimeStop.getException() == null ||
- runtimeStop.getException() instanceof DriverFatalRuntimeException ||
+ if (runtimeException == null ||
+ runtimeException instanceof DriverFatalRuntimeException ||
!this.preserveEvaluatorsAcrossRestarts) {
this.evaluators.close();
}
this.resourceManagerStopHandler.onNext(runtimeStop);
+
// Inform the client of the shutdown.
- final Optional<Throwable> exception =
Optional.<Throwable>ofNullable(runtimeStop.getException());
- this.driverStatusManager.sendJobEndingMessageToClient(exception);
+
this.driverStatusManager.onRuntimeStop(Optional.ofNullable(runtimeException));
+
// Close the remoteManager.
try {
this.remoteManager.close();
LOG.log(Level.INFO, "Driver shutdown complete");
} catch (final Exception e) {
+ LOG.log(Level.WARNING, "Error when closing the RemoteManager", e);
throw new RuntimeException("Unable to close the RemoteManager.", e);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
index 86a4429..129e702 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
@@ -33,44 +33,53 @@ import java.util.logging.Logger;
/**
* Manages the Driver's status.
+ * Communicates status changes to the client and shuts down the runtime clock
on shutdown.
*/
public final class DriverStatusManager {
+
private static final Logger LOG =
Logger.getLogger(DriverStatusManager.class.getName());
+ private static final String CLASS_NAME =
DriverStatusManager.class.getCanonicalName();
+
private final Clock clock;
private final ClientConnection clientConnection;
private final String jobIdentifier;
private final ExceptionCodec exceptionCodec;
+
private DriverStatus driverStatus = DriverStatus.PRE_INIT;
private Optional<Throwable> shutdownCause = Optional.empty();
private boolean driverTerminationHasBeenCommunicatedToClient = false;
-
/**
- * @param clock
- * @param clientConnection
- * @param jobIdentifier
- * @param exceptionCodec
+ * Build a new status manager. This is done automatically by Tang.
+ * @param clock runtime event loop to shut down on completion or error.
+ * @param clientConnection Connection to the job client. Send init, running,
and job ending messages.
+ * @param jobIdentifier String job ID.
+ * @param exceptionCodec codec to serialize the exception when sending job
ending message to the client.
*/
@Inject
- DriverStatusManager(final Clock clock,
- final ClientConnection clientConnection,
- @Parameter(JobIdentifier.class) final String
jobIdentifier,
- final ExceptionCodec exceptionCodec) {
- LOG.entering(DriverStatusManager.class.getCanonicalName(), "<init>");
+ private DriverStatusManager(
+ @Parameter(JobIdentifier.class) final String jobIdentifier,
+ final Clock clock,
+ final ClientConnection clientConnection,
+ final ExceptionCodec exceptionCodec) {
+
+ LOG.entering(CLASS_NAME, "<init>");
+
this.clock = clock;
this.clientConnection = clientConnection;
this.jobIdentifier = jobIdentifier;
this.exceptionCodec = exceptionCodec;
+
LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'");
- LOG.exiting(DriverStatusManager.class.getCanonicalName(), "<init>");
+
+ LOG.exiting(CLASS_NAME, "<init>");
}
/**
* Check whether a state transition 'from->to' is legal.
- *
- * @param from
- * @param to
- * @return
+ * @param from Source state.
+ * @param to Destination state.
+ * @return true if transition is valid, false otherwise.
*/
private static boolean isLegalTransition(final DriverStatus from, final
DriverStatus to) {
switch (from) {
@@ -108,10 +117,13 @@ public final class DriverStatusManager {
* Changes the driver status to INIT and sends message to the client about
the transition.
*/
public synchronized void onInit() {
- LOG.entering(DriverStatusManager.class.getCanonicalName(), "onInit");
+
+ LOG.entering(CLASS_NAME, "onInit");
+
this.clientConnection.send(this.getInitMessage());
this.setStatus(DriverStatus.INIT);
- LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onInit");
+
+ LOG.exiting(CLASS_NAME, "onInit");
}
/**
@@ -119,22 +131,27 @@ public final class DriverStatusManager {
* If the driver is in status 'PRE_INIT', this first calls onInit();
*/
public synchronized void onRunning() {
- LOG.entering(DriverStatusManager.class.getCanonicalName(), "onRunning");
+
+ LOG.entering(CLASS_NAME, "onRunning");
+
if (this.driverStatus.equals(DriverStatus.PRE_INIT)) {
this.onInit();
}
+
this.clientConnection.send(this.getRunningMessage());
this.setStatus(DriverStatus.RUNNING);
- LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onRunning");
+
+ LOG.exiting(CLASS_NAME, "onRunning");
}
/**
* End the Driver with an exception.
- *
- * @param exception
+ * @param exception Exception that causes the driver shutdown.
*/
public synchronized void onError(final Throwable exception) {
- LOG.entering(DriverStatusManager.class.getCanonicalName(), "onError", new
Object[]{exception});
+
+ LOG.entering(CLASS_NAME, "onError", exception);
+
if (this.isShuttingDownOrFailing()) {
LOG.log(Level.WARNING, "Received an exception while already in
shutdown.", exception);
} else {
@@ -143,63 +160,78 @@ public final class DriverStatusManager {
this.clock.stop(exception);
this.setStatus(DriverStatus.FAILING);
}
- LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new
Object[]{exception});
+
+ LOG.exiting(CLASS_NAME, "onError", exception);
}
/**
* Perform a clean shutdown of the Driver.
*/
- @SuppressWarnings("checkstyle:constructorwithoutparams") // Exception() here
captures the callstack
public synchronized void onComplete() {
- LOG.entering(DriverStatusManager.class.getCanonicalName(), "onComplete");
+
+ LOG.entering(CLASS_NAME, "onComplete");
+
if (this.isShuttingDownOrFailing()) {
- LOG.log(Level.WARNING, "Ignoring second call to onComplete()");
+ LOG.log(Level.WARNING, "Ignoring second call to onComplete()",
+ new Exception("Dummy exception to get the call stack"));
} else {
LOG.log(Level.INFO, "Clean shutdown of the Driver.");
if (LOG.isLoggable(Level.FINEST)) {
- LOG.log(Level.FINEST, "Callstack: ", new Exception());
+ LOG.log(Level.FINEST, "Call stack: ",
+ new Exception("Dummy exception to get the call stack"));
}
this.clock.close();
this.setStatus(DriverStatus.SHUTTING_DOWN);
}
- LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onComplete");
+ LOG.exiting(CLASS_NAME, "onComplete");
+ }
+
+ /**
+ * Sends the final message to the client. This is used by
DriverRuntimeStopHandler.onNext().
+ * @param exception Exception that caused the job to end (optional).
+ */
+ public synchronized void onRuntimeStop(final Optional<Throwable> exception) {
+ this.sendJobEndingMessageToClient(exception);
}
/**
* Sends the final message to the Driver. This is used by
DriverRuntimeStopHandler.onNext().
- *
- * @param exception
+ * @param exception Exception that caused the job to end (can be absent).
+ * @deprecated TODO[JIRA REEF-1548] Do not use DriverStatusManager as a
proxy to the job client.
+ * After release 0.16, make this method private and use it inside
onRuntimeStop() method instead.
*/
public synchronized void sendJobEndingMessageToClient(final
Optional<Throwable> exception) {
- if (this.isNotShuttingDownOrFailing()) {
+
+ if (!this.isShuttingDownOrFailing()) {
LOG.log(Level.SEVERE, "Sending message in a state different that
SHUTTING_DOWN or FAILING. " +
- "This is likely a illegal call to clock.close() at play. Current
state: " + this.driverStatus);
+ "This is likely a illegal call to clock.close() at play. Current
state: {0}", this.driverStatus);
}
+
if (this.driverTerminationHasBeenCommunicatedToClient) {
LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice.
Ignoring the second call");
- } else {
- // Log the shutdown situation
- if (this.shutdownCause.isPresent()) {
- LOG.log(Level.WARNING, "Sending message about an unclean driver
shutdown.", this.shutdownCause.get());
- }
- if (exception.isPresent()) {
- LOG.log(Level.WARNING, "There was an exception during clock.close().",
exception.get());
- }
- if (this.shutdownCause.isPresent() && exception.isPresent()) {
- LOG.log(Level.WARNING, "The driver is shutdown because of an exception
(see above) and there was " +
- "an exception during clock.close(). Only the first exception will
be sent to the client");
- }
+ return;
+ }
- if (this.shutdownCause.isPresent()) {
- // Send the earlier exception, if there was one
- this.clientConnection.send(getJobEndingMessage(this.shutdownCause));
- } else {
- // Send the exception passed, if there was one.
- this.clientConnection.send(getJobEndingMessage(exception));
- }
- this.driverTerminationHasBeenCommunicatedToClient = true;
+ // Log the shutdown situation
+ if (this.shutdownCause.isPresent()) {
+ LOG.log(Level.WARNING, "Sending message about an unclean driver
shutdown.", this.shutdownCause.get());
+ }
+
+ if (exception.isPresent()) {
+ LOG.log(Level.WARNING, "There was an exception during clock.close().",
exception.get());
+ }
+
+ if (this.shutdownCause.isPresent() && exception.isPresent()) {
+ LOG.log(Level.WARNING, "The driver is shutdown because of an exception
(see above) and there was " +
+ "an exception during clock.close(). Only the first exception will be
sent to the client");
}
+
+ // Send the earlier exception, if there was one. Otherwise, send the
exception passed.
+ this.clientConnection.send(getJobEndingMessage(
+ this.shutdownCause.isPresent() ? this.shutdownCause : exception));
+
+ this.driverTerminationHasBeenCommunicatedToClient = true;
}
public synchronized boolean isShuttingDownOrFailing() {
@@ -207,21 +239,16 @@ public final class DriverStatusManager {
|| DriverStatus.FAILING.equals(this.driverStatus);
}
- private synchronized boolean isNotShuttingDownOrFailing() {
- return !isShuttingDownOrFailing();
- }
-
/**
- * Helper method to set the status. This also checks whether the transition
from the current status to the new one is
- * legal.
- *
- * @param newStatus
+ * Helper method to set the status.
+ * This also checks whether the transition from the current status to the
new one is legal.
+ * @param newStatus Driver status to transition to.
*/
private synchronized void setStatus(final DriverStatus newStatus) {
if (isLegalTransition(this.driverStatus, newStatus)) {
this.driverStatus = newStatus;
} else {
- LOG.log(Level.WARNING, "Illegal state transiton: '" + this.driverStatus
+ "'->'" + newStatus + "'");
+ LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new
Object[] {this.driverStatus, newStatus});
}
}
@@ -229,27 +256,25 @@ public final class DriverStatusManager {
* @param exception the exception that ended the Driver, if any.
* @return message to be sent to the client at the end of the job.
*/
- private synchronized ReefServiceProtos.JobStatusProto
getJobEndingMessage(final Optional<Throwable> exception) {
- final ReefServiceProtos.JobStatusProto message;
+ private ReefServiceProtos.JobStatusProto getJobEndingMessage(final
Optional<Throwable> exception) {
if (exception.isPresent()) {
- message = ReefServiceProtos.JobStatusProto.newBuilder()
+ return ReefServiceProtos.JobStatusProto.newBuilder()
.setIdentifier(this.jobIdentifier)
.setState(ReefServiceProtos.State.FAILED)
.setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get())))
.build();
} else {
- message = ReefServiceProtos.JobStatusProto.newBuilder()
+ return ReefServiceProtos.JobStatusProto.newBuilder()
.setIdentifier(this.jobIdentifier)
.setState(ReefServiceProtos.State.DONE)
.build();
}
- return message;
}
/**
* @return The message to be sent through the ClientConnection when in state
INIT.
*/
- private synchronized ReefServiceProtos.JobStatusProto getInitMessage() {
+ private ReefServiceProtos.JobStatusProto getInitMessage() {
return ReefServiceProtos.JobStatusProto.newBuilder()
.setIdentifier(this.jobIdentifier)
.setState(ReefServiceProtos.State.INIT)
@@ -259,7 +284,7 @@ public final class DriverStatusManager {
/**
* @return The message to be sent through the ClientConnection when in state
RUNNING.
*/
- private synchronized ReefServiceProtos.JobStatusProto getRunningMessage() {
+ private ReefServiceProtos.JobStatusProto getRunningMessage() {
return ReefServiceProtos.JobStatusProto.newBuilder()
.setIdentifier(this.jobIdentifier)
.setState(ReefServiceProtos.State.RUNNING)
http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
index cdc71dd..285a1b1 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
@@ -32,43 +32,54 @@ import java.util.logging.Logger;
* Handles the various sources for driver idleness and forwards decisions to
DriverStatusManager.
*/
public final class DriverIdleManager {
+
private static final Logger LOG =
Logger.getLogger(DriverIdleManager.class.getName());
private static final Level IDLE_REASONS_LEVEL = Level.FINEST;
+
private final Set<DriverIdlenessSource> idlenessSources;
private final InjectionFuture<DriverStatusManager> driverStatusManager;
@Inject
- DriverIdleManager(@Parameter(DriverIdleSources.class) final
Set<DriverIdlenessSource> idlenessSources,
- final InjectionFuture<DriverStatusManager>
driverStatusManager) {
+ private DriverIdleManager(
+ @Parameter(DriverIdleSources.class) final Set<DriverIdlenessSource>
idlenessSources,
+ final InjectionFuture<DriverStatusManager> driverStatusManager) {
+
this.idlenessSources = idlenessSources;
this.driverStatusManager = driverStatusManager;
}
+ /**
+ * Check whether all Driver components are idle, and initiate driver
shutdown if they are.
+ * @param reason An indication whether the component is idle, along with a
descriptive message.
+ */
public synchronized void onPotentiallyIdle(final IdleMessage reason) {
- synchronized (driverStatusManager.get()) {
- if (this.driverStatusManager.get().isShuttingDownOrFailing()) {
- LOG.log(IDLE_REASONS_LEVEL, "Ignoring idle call from [{0}] for reason
[{1}]",
- new Object[]{reason.getComponentName(), reason.getReason()});
- } else {
- boolean isIdle = true;
- LOG.log(IDLE_REASONS_LEVEL, "Checking for idle because {0} reported
idleness for reason [{1}]",
- new Object[]{reason.getComponentName(), reason.getReason()});
-
-
- for (final DriverIdlenessSource idlenessSource : this.idlenessSources)
{
- final IdleMessage idleMessage = idlenessSource.getIdleStatus();
- LOG.log(IDLE_REASONS_LEVEL, "[{0}] is reporting {1} because [{2}].",
- new Object[]{idleMessage.getComponentName(),
idleMessage.isIdle() ? "idle" : "not idle",
- idleMessage.getReason()}
- );
- isIdle &= idleMessage.isIdle();
- }
-
- if (isIdle) {
- LOG.log(Level.INFO, "All components indicated idle. Initiating
Driver shutdown.");
- this.driverStatusManager.get().onComplete();
- }
- }
+
+ final DriverStatusManager driverStatusManagerImpl =
this.driverStatusManager.get();
+
+ if (driverStatusManagerImpl.isShuttingDownOrFailing()) {
+ LOG.log(IDLE_REASONS_LEVEL, "Ignoring idle call from [{0}] for reason
[{1}]",
+ new Object[] {reason.getComponentName(), reason.getReason()});
+ return;
+ }
+
+ LOG.log(IDLE_REASONS_LEVEL, "Checking for idle because {0} reported
idleness for reason [{1}]",
+ new Object[] {reason.getComponentName(), reason.getReason()});
+
+ boolean isIdle = true;
+ for (final DriverIdlenessSource idlenessSource : this.idlenessSources) {
+
+ final IdleMessage idleMessage = idlenessSource.getIdleStatus();
+
+ LOG.log(IDLE_REASONS_LEVEL, "[{0}] is reporting {1} because [{2}].",
+ new Object[] {idleMessage.getComponentName(),
+ idleMessage.isIdle() ? "idle" : "not idle",
idleMessage.getReason()});
+
+ isIdle &= idleMessage.isIdle();
+ }
+
+ if (isIdle) {
+ LOG.log(Level.INFO, "All components indicated idle. Initiating Driver
shutdown.");
+ driverStatusManagerImpl.onComplete();
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
index 273649f..f9a526d 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
@@ -33,15 +33,16 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * Manages the status of the Resource Manager.
+ * Manages the status of the Resource Manager and tracks whether it is idle.
*/
@DriverSide
@Private
-public final class ResourceManagerStatus implements
EventHandler<RuntimeStatusEvent>,
- DriverIdlenessSource {
+public final class ResourceManagerStatus implements
EventHandler<RuntimeStatusEvent>, DriverIdlenessSource {
+
private static final Logger LOG =
Logger.getLogger(ResourceManagerStatus.class.getName());
private static final String COMPONENT_NAME = "ResourceManager";
+
private static final IdleMessage IDLE_MESSAGE =
new IdleMessage(COMPONENT_NAME, "No outstanding requests or
allocations", true);
@@ -49,15 +50,21 @@ public final class ResourceManagerStatus implements
EventHandler<RuntimeStatusEv
private final DriverStatusManager driverStatusManager;
private final InjectionFuture<DriverIdleManager> driverIdleManager;
- // Mutable state.
- private State state = State.INIT;
+ /** Mutable RM state. */
+ private State state = State.INIT;
+
+ /** Number of container requests outstanding with the RM, as per latest
RuntimeStatusEvent message. */
private int outstandingContainerRequests = 0;
+
+ /** Number of containers currently allocated, as per latest
RuntimeStatusEvent message. */
private int containerAllocationCount = 0;
@Inject
- ResourceManagerStatus(final ResourceManagerErrorHandler
resourceManagerErrorHandler,
- final DriverStatusManager driverStatusManager,
- final InjectionFuture<DriverIdleManager>
driverIdleManager) {
+ private ResourceManagerStatus(
+ final ResourceManagerErrorHandler resourceManagerErrorHandler,
+ final DriverStatusManager driverStatusManager,
+ final InjectionFuture<DriverIdleManager> driverIdleManager) {
+
this.resourceManagerErrorHandler = resourceManagerErrorHandler;
this.driverStatusManager = driverStatusManager;
this.driverIdleManager = driverIdleManager;
@@ -65,11 +72,15 @@ public final class ResourceManagerStatus implements
EventHandler<RuntimeStatusEv
@Override
public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent)
{
+
final State newState = runtimeStatusEvent.getState();
- LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent);
+
+ LOG.log(Level.FINEST, "Runtime status: {0}", runtimeStatusEvent);
+
this.outstandingContainerRequests =
runtimeStatusEvent.getOutstandingContainerRequests().orElse(0);
this.containerAllocationCount =
runtimeStatusEvent.getContainerAllocationList().size();
- this.setState(runtimeStatusEvent.getState());
+
+ this.setState(newState);
switch (newState) {
case FAILED:
@@ -98,23 +109,32 @@ public final class ResourceManagerStatus implements
EventHandler<RuntimeStatusEv
}
/**
- * @return idle, if there are no outstanding requests or allocations. Not
idle else.
+ * Driver is idle if, regardless of status, it has no evaluators allocated
and no pending container requests.
+ * @return true if the driver can be considered idle, false otherwise.
+ */
+ private synchronized boolean isIdle() {
+ return this.outstandingContainerRequests == 0 &&
this.containerAllocationCount == 0;
+ }
+
+ /**
+ * Driver is idle if, regardless of status, it has no evaluators allocated
+ * and no pending container requests. This method is used in the
DriverIdleManager.
+ * If all DriverIdlenessSource components are idle, DriverIdleManager will
initiate Driver shutdown.
+ * @return idle, if there are no outstanding requests or allocations. Not
idle otherwise.
*/
@Override
public synchronized IdleMessage getIdleStatus() {
+
if (this.isIdle()) {
return IDLE_MESSAGE;
- } else {
- final String message = new StringBuilder("There are ")
- .append(this.outstandingContainerRequests)
- .append(" outstanding container requests and ")
- .append(this.containerAllocationCount)
- .append(" allocated containers")
- .toString();
- return new IdleMessage(COMPONENT_NAME, message, false);
}
- }
+ final String message = String.format(
+ "There are %d outstanding container requests and %d allocated
containers",
+ this.outstandingContainerRequests, this.containerAllocationCount);
+
+ return new IdleMessage(COMPONENT_NAME, message, false);
+ }
private synchronized void onRMFailure(final RuntimeStatusEvent
runtimeStatusEvent) {
assert runtimeStatusEvent.getState() == State.FAILED;
@@ -134,33 +154,18 @@ public final class ResourceManagerStatus implements
EventHandler<RuntimeStatusEv
}
}
-
- private synchronized boolean isIdle() {
- return this.hasNoOutstandingRequests()
- && this.hasNoContainersAllocated();
- }
-
- private synchronized boolean isRunning() {
- return State.RUNNING.equals(this.state);
- }
-
/**
- *
* Checks if the ResourceManager can switch from the current state to the
target state.
* See REEF-826 for the state transition matrix.
- *
- * @param from current state
- * @param to state to switch to
- *
- * @return true if the transition is legal; false otherwise
- *
+ * @param from current state.
+ * @param to state to switch to.
+ * @return true if the transition is legal; false otherwise.
*/
- private synchronized boolean isLegalStateTransition(final State from,
- final State to) {
+ private static boolean isLegalStateTransition(final State from, final State
to) {
// handle diagonal elements of the transition matrix
- if (from.equals(to)){
- LOG.finest("Transition from " + from + " state to the same state.");
+ if (from.equals(to)) {
+ LOG.log(Level.FINEST, "Transition from {0} state to the same state.",
from);
return true;
}
@@ -207,30 +212,15 @@ public final class ResourceManagerStatus implements
EventHandler<RuntimeStatusEv
default:
return false;
-
}
-
}
private synchronized void setState(final State newState) {
-
if (isLegalStateTransition(this.state, newState)) {
this.state = newState;
} else {
- throw new IllegalStateException("Resource manager attempts illegal state
transition from "
- + this.state + " to "
- + newState);
+ throw new IllegalStateException(
+ "Resource manager attempts illegal state transition from " +
this.state + " to " + newState);
}
-
- }
-
-
- private synchronized boolean hasNoOutstandingRequests() {
- return this.outstandingContainerRequests == 0;
- }
-
- private synchronized boolean hasNoContainersAllocated() {
- return this.containerAllocationCount == 0;
}
-
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
index daafece..3184fc5 100644
---
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
+++
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
@@ -45,14 +45,7 @@ import org.apache.reef.wake.remote.RemoteMessage;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -67,8 +60,7 @@ final class ContainerManager implements AutoCloseable {
private static final Logger LOG =
Logger.getLogger(ContainerManager.class.getName());
- private static final Collection<String> DEFAULT_RACKS =
Arrays.asList(RackNames.DEFAULT_RACK_NAME);
-
+ private static final Collection<String> DEFAULT_RACKS =
Collections.singletonList(RackNames.DEFAULT_RACK_NAME);
/**
* Map from containerID -> Container.
@@ -104,7 +96,7 @@ final class ContainerManager implements AutoCloseable {
private final Collection<String> availableRacks;
@Inject
- ContainerManager(
+ private ContainerManager(
final RemoteManager remoteManager,
final REEFFileNames fileNames,
@Parameter(MaxNumberOfEvaluators.class) final int capacity,
@@ -116,6 +108,7 @@ final class ContainerManager implements AutoCloseable {
final LocalAddressProvider localAddressProvider,
@Parameter(DefaultMemorySize.class) final int defaultMemorySize,
@Parameter(DefaultNumberOfCores.class) final int defaultNumberOfCores) {
+
this.capacity = capacity;
this.defaultMemorySize = defaultMemorySize;
this.defaultNumberOfCores = defaultNumberOfCores;
@@ -144,55 +137,75 @@ final class ContainerManager implements AutoCloseable {
LOG.log(Level.FINE, "Initialized Container Manager with {0} containers",
capacity);
}
- private Collection<String> normalize(final Collection<String> rackNames) {
+ /**
+ * Normalize rack names. Make sure that each rack name starts with a path
separator /
+ * and does not have a path separator at the end. Also check that no rack
names
+ * end with a wildcard *, and raise an exception if such rack name occurs in
the input.
+ * @param rackNames Collection of rack names to normalize.
+ * @return Collection of normalized rack names.
+ * @throws IllegalArgumentException if validation of some rack names' fails.
+ */
+ private static Collection<String> normalize(
+ final Collection<String> rackNames) throws IllegalArgumentException {
+
return normalize(rackNames, true);
}
/**
- * Normalizes the rack names.
- *
- * @param rackNames
- * the rack names to normalize
- * @param validateEnd
- * if true, throws an exception if the name ends with ANY (*)
- * @return a normalized collection
+ * Normalize rack names. Make sure that each rack name starts with a path
separator /
+ * and does not have a path separator at the end. Also, if end validation is
on, check
+ * that rack name does not end with a wildcard *.
+ * @param rackNames Collection of rack names to normalize.
+ * @param validateEnd If true, throw an exception if the name ends with ANY
(*)
+ * @return Collection of normalized rack names.
+ * @throws IllegalArgumentException if validation of some rack names' fails.
*/
- private Collection<String> normalize(final Collection<String> rackNames,
- final boolean validateEnd) {
+ private static Collection<String> normalize(
+ final Collection<String> rackNames, final boolean validateEnd) throws
IllegalArgumentException {
+
final List<String> normalizedRackNames = new ArrayList<>(rackNames.size());
- final Iterator<String> it = rackNames.iterator();
- while (it.hasNext()) {
- String rackName = it.next().trim();
+
+ for (String rackName : rackNames) {
+
+ rackName = rackName.trim();
Validate.notEmpty(rackName, "Rack names cannot be empty");
+
// should start with a separator
if (!rackName.startsWith(Constants.RACK_PATH_SEPARATOR)) {
rackName = Constants.RACK_PATH_SEPARATOR + rackName;
}
+
// remove the ending separator
if (rackName.endsWith(Constants.RACK_PATH_SEPARATOR)) {
rackName = rackName.substring(0, rackName.length() - 1);
}
+
if (validateEnd) {
Validate.isTrue(!rackName.endsWith(Constants.ANY_RACK));
}
+
normalizedRackNames.add(rackName);
}
+
return normalizedRackNames;
}
private void init() {
+
// evenly distribute the containers among the racks
// if rack names are not specified, the default rack will be used, so the
denominator will always be > 0
- final int capacityPerRack = capacity / availableRacks.size();
- int missing = capacity % availableRacks.size();
+ final int capacityPerRack = this.capacity / this.availableRacks.size();
+ int missing = this.capacity % this.availableRacks.size();
+
// initialize the freeNodesPerRackList and the capacityPerRack
- for (final String rackName : availableRacks) {
- this.freeNodesPerRack.put(rackName, new HashMap<String, Boolean>());
- this.capacitiesPerRack.put(rackName, capacityPerRack);
+ for (final String rackName : this.availableRacks) {
+ int currentCapacity = capacityPerRack;
if (missing > 0) {
- this.capacitiesPerRack.put(rackName,
this.capacitiesPerRack.get(rackName) + 1);
- missing--;
+ ++currentCapacity;
+ --missing;
}
+ this.capacitiesPerRack.put(rackName, currentCapacity);
+ this.freeNodesPerRack.put(rackName, new HashMap<String, Boolean>());
}
}
@@ -226,29 +239,25 @@ final class ContainerManager implements AutoCloseable {
}
private Collection<String> getRackNamesOrDefault(final List<String>
rackNames) {
- return CollectionUtils.isNotEmpty(rackNames) ? normalize(rackNames, false)
- : DEFAULT_RACKS;
+ return CollectionUtils.isNotEmpty(rackNames) ? normalize(rackNames, false)
: DEFAULT_RACKS;
}
-
/**
- * Returns the node name of the container to be allocated if it's available,
selected from the list of preferred
- * node names. If the list is empty, then an empty optional is returned
- *
- * @param nodeNames
- * the list of preferred nodes
- * @return the node name where to allocate the container
+ * Returns the node name of the container to be allocated if it's available,
+ * selected from the list of preferred node names.
+ * If the list is empty, then an empty optional is returned.
+ * @param nodeNames the list of preferred nodes.
+ * @return the node name where to allocate the container.
*/
private Optional<String> getPreferredNode(final List<String> nodeNames) {
- if (CollectionUtils.isNotEmpty(nodeNames)) {
- for (final String nodeName : nodeNames) {
- final String possibleRack = racksPerNode.get(nodeName);
- if (possibleRack != null
- && freeNodesPerRack.get(possibleRack).containsKey(nodeName)) {
- return Optional.of(nodeName);
- }
+
+ for (final String nodeName : nodeNames) {
+ final String possibleRack = this.racksPerNode.get(nodeName);
+ if (possibleRack != null &&
this.freeNodesPerRack.get(possibleRack).containsKey(nodeName)) {
+ return Optional.of(nodeName);
}
}
+
return Optional.empty();
}
@@ -257,60 +266,57 @@ final class ContainerManager implements AutoCloseable {
* preferred rack names. If the list is empty, and there's space in the
default
* rack, then the default rack is returned. The relax locality semantic is
* enabled if the list of rack names contains '/*', otherwise relax locality
- * is considered disabled
+ * is considered disabled.
*
- * @param rackNames
- * the list of preferred racks
- * @return the rack name where to allocate the container
+ * @param rackNames the list of preferred racks.
+ * @return the rack name where to allocate the container.
*/
private Optional<String> getPreferredRack(final List<String> rackNames) {
- final Collection<String> normalized = getRackNamesOrDefault(rackNames);
- for (final String rackName : normalized) {
- // if it does not end with the any modifier,
- // then we should do an exact match
+
+ for (final String rackName : getRackNamesOrDefault(rackNames)) {
+
+ // if it does not end with the any modifier, then we should do an exact
match
if (!rackName.endsWith(Constants.ANY_RACK)) {
- if (freeNodesPerRack.containsKey(rackName)
- && freeNodesPerRack.get(rackName).size() > 0) {
+ if (freeNodesPerRack.containsKey(rackName) &&
freeNodesPerRack.get(rackName).size() > 0) {
return Optional.of(rackName);
}
} else {
+
// if ends with the any modifier, we do a prefix match
- final Iterator<String> it = availableRacks.iterator();
- while (it.hasNext()) {
- final String possibleRackName = it.next();
+ for (final String possibleRackName : this.availableRacks) {
+
// remove the any modifier
- final String newRackName = rackName.substring(0,
- rackName.length() - 1);
+ final String newRackName = rackName.substring(0, rackName.length() -
1);
+
if (possibleRackName.startsWith(newRackName) &&
- freeNodesPerRack.get(possibleRackName).size() > 0) {
+ this.freeNodesPerRack.get(possibleRackName).size() > 0) {
return Optional.of(possibleRackName);
}
}
}
}
+
return Optional.empty();
}
/**
* Allocates a container based on a request event. First it tries to match a
- * given node, if it cannot, it tries to get a spot in a rack
- *
- * @param requestEvent
- * the request event
- * @return an optional with the container if allocated
+ * given node, if it cannot, it tries to get a spot in a rack.
+ * @param requestEvent resource request event.
+ * @return an optional with the container if allocated.
*/
Optional<Container> allocateContainer(final ResourceRequestEvent
requestEvent) {
+
Container container = null;
- final Optional<String> nodeName = getPreferredNode(requestEvent
- .getNodeNameList());
+ final Optional<String> nodeName =
getPreferredNode(requestEvent.getNodeNameList());
+
if (nodeName.isPresent()) {
container = allocateBasedOnNode(
requestEvent.getMemorySize().orElse(this.defaultMemorySize),
requestEvent.getVirtualCores().orElse(this.defaultNumberOfCores),
nodeName.get());
} else {
- final Optional<String> rackName = getPreferredRack(requestEvent
- .getRackNameList());
+ final Optional<String> rackName =
getPreferredRack(requestEvent.getRackNameList());
if (rackName.isPresent()) {
container = allocateBasedOnRack(
requestEvent.getMemorySize().orElse(this.defaultMemorySize),
@@ -318,11 +324,11 @@ final class ContainerManager implements AutoCloseable {
rackName.get());
}
}
+
return Optional.ofNullable(container);
}
- private Container allocateBasedOnNode(final int megaBytes,
- final int numberOfCores, final String nodeId) {
+ private Container allocateBasedOnNode(final int megaBytes, final int
numberOfCores, final String nodeId) {
synchronized (this.containers) {
// get the rack name
final String rackName = this.racksPerNode.get(nodeId);
@@ -333,38 +339,41 @@ final class ContainerManager implements AutoCloseable {
}
}
- private Container allocateBasedOnRack(final int megaBytes,
- final int numberOfCores, final String rackName) {
+ private Container allocateBasedOnRack(final int megaBytes, final int
numberOfCores, final String rackName) {
synchronized (this.containers) {
+
// get the first free nodeId in the rack
- final Set<String> freeNodes = this.freeNodesPerRack.get(rackName)
- .keySet();
- final Iterator<String> it = freeNodes.iterator();
+ final Iterator<String> it =
this.freeNodesPerRack.get(rackName).keySet().iterator();
+
if (!it.hasNext()) {
- throw new IllegalArgumentException(
- "There should be a free node in the specified rack " + rackName);
+ throw new IllegalArgumentException("There should be a free node in the
specified rack " + rackName);
}
+
final String nodeId = it.next();
- // remove it from the free map
- this.freeNodesPerRack.get(rackName).remove(nodeId);
+ it.remove();
+
// allocate
return allocate(megaBytes, numberOfCores, nodeId, rackName);
}
}
- private Container allocate(final int megaBytes, final int numberOfCores,
- final String nodeId, final String rackName) {
- final String processID = nodeId + "-"
- + String.valueOf(System.currentTimeMillis());
+ private Container allocate(
+ final int megaBytes, final int numberOfCores, final String nodeId, final
String rackName) {
+
+ final String processID = nodeId + "-" +
String.valueOf(System.currentTimeMillis());
+
final File processFolder = new File(this.rootFolder, processID);
if (!processFolder.exists() && !processFolder.mkdirs()) {
LOG.log(Level.WARNING, "Failed to create [{0}]",
processFolder.getAbsolutePath());
}
+
final ProcessContainer container = new ProcessContainer(
this.errorHandlerRID, nodeId, processID, processFolder, megaBytes,
numberOfCores, rackName, this.fileNames, this.processObserver);
+
this.containers.put(container.getContainerID(), container);
LOG.log(Level.FINE, "Allocated {0}", container.getContainerID());
+
return container;
}
@@ -403,13 +412,12 @@ final class ContainerManager implements AutoCloseable {
if (this.containers.isEmpty()) {
LOG.log(Level.FINEST, "Clean shutdown with no outstanding
containers.");
} else {
- LOG.log(Level.WARNING, "Dirty shutdown with outstanding containers.");
+ LOG.log(Level.WARNING, "Dirty shutdown with {0} outstanding
containers.", this.containers.size());
for (final Container c : this.containers.values()) {
- LOG.log(Level.WARNING, "Force shutdown of: {0}", c);
+ LOG.log(Level.WARNING, "Force shutdown of container: {0}", c);
c.close();
}
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java
b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java
index 62646d2..71dad64 100644
---
a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java
+++
b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java
@@ -21,10 +21,6 @@ package org.apache.reef.runtime.multi.driver;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.reef.runtime.common.driver.api.*;
-import
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
-import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
-import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
-import
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
import
org.apache.reef.runtime.multi.client.parameters.SerializedRuntimeDefinition;
import org.apache.reef.runtime.multi.driver.parameters.RuntimeName;
import org.apache.reef.runtime.multi.utils.MultiRuntimeDefinitionSerializer;
@@ -32,8 +28,8 @@ import
org.apache.reef.runtime.multi.utils.avro.AvroMultiRuntimeDefinition;
import org.apache.reef.runtime.multi.utils.avro.AvroRuntimeDefinition;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.AvroConfigurationSerializer;
@@ -53,30 +49,36 @@ import java.util.logging.Logger;
* Hosts the actual runtime implementations and delegates invocations to them.
*/
final class RuntimesHost {
+
private static final Logger LOG =
Logger.getLogger(RuntimesHost.class.getName());
+
private final AvroMultiRuntimeDefinition runtimeDefinition;
private final Injector originalInjector;
private final String defaultRuntimeName;
- private final MultiRuntimeDefinitionSerializer runtimeDefinitionSerializer
= new MultiRuntimeDefinitionSerializer();
+
private Map<String, Runtime> runtimes;
@Inject
- private RuntimesHost(final Injector injector,
- @Parameter(SerializedRuntimeDefinition.class) final
String serializedRuntimeDefinition) {
+ private RuntimesHost(
+ final Injector injector,
+ @Parameter(SerializedRuntimeDefinition.class) final String
serializedRuntimeDefinition) {
+
this.originalInjector = injector;
+
try {
- this.runtimeDefinition =
this.runtimeDefinitionSerializer.fromString(serializedRuntimeDefinition);
- } catch (IOException e) {
+ this.runtimeDefinition = new
MultiRuntimeDefinitionSerializer().fromString(serializedRuntimeDefinition);
+ } catch (final IOException e) {
throw new RuntimeException("Unable to read runtime configuration.", e);
}
- this.defaultRuntimeName =
runtimeDefinition.getDefaultRuntimeName().toString();
+ this.defaultRuntimeName =
this.runtimeDefinition.getDefaultRuntimeName().toString();
}
/**
* Initializes the configured runtimes.
*/
private synchronized void initialize() {
+
if (this.runtimes != null) {
return;
}
@@ -90,58 +92,60 @@ final class RuntimesHost {
// fork the original injector because of the same reason.
// We create new injectors and copy form the original injector what we
need.
// rootInjector is an emptyInjector that we copy bindings from the
original injector into. Then we fork
- //it to instantiate the actual runtime.
- Injector rootInjector = Tang.Factory.getTang().newInjector();
+ // it to instantiate the actual runtime.
+ final Injector rootInjector = Tang.Factory.getTang().newInjector();
initializeInjector(rootInjector);
- final JavaConfigurationBuilder cb =
Tang.Factory.getTang().newConfigurationBuilder();
- cb.bindNamedParameter(RuntimeName.class,
rd.getRuntimeName().toString());
- cb.bindImplementation(Runtime.class, RuntimeImpl.class);
- AvroConfigurationSerializer serializer = new
AvroConfigurationSerializer();
- Configuration config =
serializer.fromString(rd.getSerializedConfiguration().toString());
- final Injector runtimeInjector = rootInjector.forkInjector(config,
cb.build());
+ final Configuration runtimeConfig =
+ Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(RuntimeName.class,
rd.getRuntimeName().toString())
+ .bindImplementation(Runtime.class, RuntimeImpl.class)
+ .build();
+
+ final Configuration config =
+ new
AvroConfigurationSerializer().fromString(rd.getSerializedConfiguration().toString());
+
+ final Injector runtimeInjector = rootInjector.forkInjector(config,
runtimeConfig);
+
this.runtimes.put(rd.getRuntimeName().toString(),
runtimeInjector.getInstance(Runtime.class));
- } catch (InjectionException e) {
- throw new RuntimeException("Unable to initialize runtimes.", e);
- } catch (IOException e) {
+
+ } catch (final IOException | InjectionException e) {
throw new RuntimeException("Unable to initialize runtimes.", e);
}
}
}
/**
+ * Copy event handler from current class configuration into runtime injector.
+ * This is a helper method called from initializeInjector() only.
+ * @param runtimeInjector Runtime injector to copy event handler to.
+ * @param param Class that identifies the event handler parameter.
+ * @param <T> Type of the event handler.
+ * @throws InjectionException If configuration error occurs.
+ */
+ private <T extends EventHandler<?>> void copyEventHandler(
+ final Injector runtimeInjector, final Class<? extends Name<T>> param)
throws InjectionException {
+ runtimeInjector.bindVolatileParameter(param,
this.originalInjector.getNamedInstance(param));
+ }
+
+ /**
* Initializes injector by copying needed handlers.
* @param runtimeInjector The injector to initialize
- * @throws InjectionException
+ * @throws InjectionException on configuration error.
*/
private void initializeInjector(final Injector runtimeInjector) throws
InjectionException {
- final EventHandler<ResourceStatusEvent> statusEventHandler =
-
this.originalInjector.getNamedInstance(RuntimeParameters.ResourceStatusHandler.class);
-
runtimeInjector.bindVolatileParameter(RuntimeParameters.ResourceStatusHandler.class,
statusEventHandler);
- final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler =
-
this.originalInjector.getNamedInstance(RuntimeParameters.NodeDescriptorHandler.class);
-
runtimeInjector.bindVolatileParameter(RuntimeParameters.NodeDescriptorHandler.class,
nodeDescriptorEventHandler);
- final EventHandler<ResourceAllocationEvent> resourceAllocationEventHandler
=
-
this.originalInjector.getNamedInstance(RuntimeParameters.ResourceAllocationHandler.class);
- runtimeInjector.bindVolatileParameter(
- RuntimeParameters.ResourceAllocationHandler.class,
- resourceAllocationEventHandler);
- final EventHandler<RuntimeStatusEvent> runtimeStatusEventHandler =
-
this.originalInjector.getNamedInstance(RuntimeParameters.RuntimeStatusHandler.class);
- runtimeInjector.bindVolatileParameter(
- RuntimeParameters.RuntimeStatusHandler.class,
- runtimeStatusEventHandler);
- HttpServer httpServer = null;
+
+ copyEventHandler(runtimeInjector,
RuntimeParameters.ResourceStatusHandler.class);
+ copyEventHandler(runtimeInjector,
RuntimeParameters.NodeDescriptorHandler.class);
+ copyEventHandler(runtimeInjector,
RuntimeParameters.ResourceAllocationHandler.class);
+ copyEventHandler(runtimeInjector,
RuntimeParameters.RuntimeStatusHandler.class);
+
try {
- httpServer = this.originalInjector.getInstance(HttpServer.class);
+ runtimeInjector.bindVolatileInstance(HttpServer.class,
this.originalInjector.getInstance(HttpServer.class));
+ LOG.log(Level.INFO, "Binding http server for the runtime
implementation");
} catch (final InjectionException e) {
LOG.log(Level.INFO, "Http Server is not configured for the runtime", e);
}
-
- if (httpServer != null) {
- runtimeInjector.bindVolatileInstance(HttpServer.class, httpServer);
- LOG.log(Level.INFO, "Binding http server for the runtime
implementation");
- }
}
/**
@@ -150,39 +154,38 @@ final class RuntimesHost {
* @return The runtime
*/
private Runtime getRuntime(final String requestedRuntimeName) {
- String runtimeName = requestedRuntimeName;
- if (StringUtils.isBlank(runtimeName)) {
- runtimeName = this.defaultRuntimeName;
- }
- Runtime runtime = this.runtimes.get(runtimeName);
+ final String runtimeName =
+ StringUtils.isBlank(requestedRuntimeName) ? this.defaultRuntimeName :
requestedRuntimeName;
+ final Runtime runtime = this.runtimes.get(runtimeName);
Validate.notNull(runtime, "Couldn't find runtime for name " + runtimeName);
+
return runtime;
}
- void onResourceLaunch(final ResourceLaunchEvent value) {
- getRuntime(value.getRuntimeName()).onResourceLaunch(value);
+ void onResourceLaunch(final ResourceLaunchEvent event) {
+ getRuntime(event.getRuntimeName()).onResourceLaunch(event);
}
- void onRuntimeStart(final RuntimeStart value) {
+ void onRuntimeStart(final RuntimeStart event) {
initialize();
- for (Runtime runtime : this.runtimes.values()) {
- runtime.onRuntimeStart(value);
+ for (final Runtime runtime : this.runtimes.values()) {
+ runtime.onRuntimeStart(event);
}
}
- void onRuntimeStop(final RuntimeStop value) {
- for (Runtime runtime : this.runtimes.values()) {
- runtime.onRuntimeStop(value);
+ void onRuntimeStop(final RuntimeStop event) {
+ for (final Runtime runtime : this.runtimes.values()) {
+ runtime.onRuntimeStop(event);
}
}
- void onResourceRelease(final ResourceReleaseEvent value) {
- getRuntime(value.getRuntimeName()).onResourceRelease(value);
+ void onResourceRelease(final ResourceReleaseEvent event) {
+ getRuntime(event.getRuntimeName()).onResourceRelease(event);
}
- void onResourceRequest(final ResourceRequestEvent value) {
- getRuntime(value.getRuntimeName()).onResourceRequest(value);
+ void onResourceRequest(final ResourceRequestEvent event) {
+ getRuntime(event.getRuntimeName()).onResourceRequest(event);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d25412e9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index f38b53a..58fcca1 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -45,7 +45,6 @@ import
org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.Optional;
-import org.apache.reef.wake.remote.Encoder;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
import javax.inject.Inject;
@@ -65,15 +64,12 @@ final class YarnContainerManager
private static final String RUNTIME_NAME = "YARN";
private final YarnClient yarnClient = YarnClient.createYarnClient();
-
private final Queue<AMRMClient.ContainerRequest> requestsBeforeSentToRM =
new ConcurrentLinkedQueue<>();
-
private final Queue<AMRMClient.ContainerRequest> requestsAfterSentToRM = new
ConcurrentLinkedQueue<>();
-
private final Map<String, String> nodeIdToRackName = new
ConcurrentHashMap<>();
private final YarnConfiguration yarnConf;
- private final AMRMClientAsync resourceManager;
+ private final AMRMClientAsync<AMRMClient.ContainerRequest> resourceManager;
private final NMClientAsync nodeManager;
private final REEFEventHandlers reefEventHandlers;
private final Containers containers;
@@ -87,19 +83,20 @@ final class YarnContainerManager
private final InjectionFuture<ProgressProvider> progressProvider;
@Inject
- YarnContainerManager(
- final YarnConfiguration yarnConf,
+ private YarnContainerManager(
@Parameter(YarnHeartbeatPeriod.class) final int yarnRMHeartbeatPeriod,
+ @Parameter(JobSubmissionDirectory.class) final String
jobSubmissionDirectory,
+ final YarnConfiguration yarnConf,
final REEFEventHandlers reefEventHandlers,
final Containers containers,
final ApplicationMasterRegistration registration,
final ContainerRequestCounter containerRequestCounter,
final DriverStatusManager driverStatusManager,
final REEFFileNames reefFileNames,
- @Parameter(JobSubmissionDirectory.class) final String
jobSubmissionDirectory,
final TrackingURLProvider trackingURLProvider,
final RackNameFormatter rackNameFormatter,
final InjectionFuture<ProgressProvider> progressProvider) throws
IOException {
+
this.reefEventHandlers = reefEventHandlers;
this.driverStatusManager = driverStatusManager;
@@ -110,43 +107,56 @@ final class YarnContainerManager
this.trackingURLProvider = trackingURLProvider;
this.rackNameFormatter = rackNameFormatter;
-
this.yarnClient.init(this.yarnConf);
this.resourceManager =
AMRMClientAsync.createAMRMClientAsync(yarnRMHeartbeatPeriod, this);
this.nodeManager = new NMClientAsyncImpl(this);
+
this.jobSubmissionDirectory = jobSubmissionDirectory;
this.reefFileNames = reefFileNames;
this.progressProvider = progressProvider;
+
LOG.log(Level.FINEST, "Instantiated YarnContainerManager");
}
-
+ /**
+ * RM Callback: RM reports some completed containers. Update status of each
container in the list.
+ * @param completedContainers list of completed containers.
+ */
@Override
- public void onContainersCompleted(final List<ContainerStatus>
containerStatuses) {
- for (final ContainerStatus containerStatus : containerStatuses) {
- onContainerStatus(containerStatus);
+ public void onContainersCompleted(final List<ContainerStatus>
completedContainers) {
+ for (final ContainerStatus containerStatus : completedContainers) {
+ this.onContainerStatus(containerStatus);
}
}
+ /**
+ * RM Callback: RM reports that some containers have been allocated.
+ * @param allocatedContainers list of containers newly allocated by RM.
+ */
@Override
- @SuppressWarnings("checkstyle:hiddenfield")
- public void onContainersAllocated(final List<Container> containers) {
+ public void onContainersAllocated(final List<Container> allocatedContainers)
{
+
+ String id = null; // ID is used for logging only
- // ID is used for logging only
- final String id = String.format("%s:%d",
- Thread.currentThread().getName().replace(' ', '_'),
System.currentTimeMillis());
+ if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "TIME: Allocated Containers {0} {1} of {2}",
- new Object[]{id, containers.size(),
this.containerRequestCounter.get()});
+ id = String.format("%s:%d", Thread.currentThread().getName().replace('
', '_'), System.currentTimeMillis());
- for (final Container container : containers) {
- handleNewContainer(container);
+ LOG.log(Level.FINE, "TIME: Allocated Containers {0} {1} of {2}",
+ new Object[] {id, allocatedContainers.size(),
this.containerRequestCounter.get()});
+ }
+
+ for (final Container container : allocatedContainers) {
+ this.handleNewContainer(container);
}
LOG.log(Level.FINE, "TIME: Processed Containers {0}", id);
}
+ /**
+ * RM Callback: RM requests application shutdown.
+ */
@Override
public void onShutdownRequest() {
this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder()
@@ -154,14 +164,23 @@ final class YarnContainerManager
this.driverStatusManager.onError(new Exception("Shutdown requested by
YARN."));
}
+ /**
+ * RM Callback: RM reports status change of some nodes.
+ * @param nodeReports list of nodes with changed status.
+ */
@Override
public void onNodesUpdated(final List<NodeReport> nodeReports) {
for (final NodeReport nodeReport : nodeReports) {
this.nodeIdToRackName.put(nodeReport.getNodeId().toString(),
nodeReport.getRackName());
- onNodeReport(nodeReport);
+ this.onNodeReport(nodeReport);
}
}
+ /**
+ * RM Callback: Report application progress to RM.
+ * Progress is a floating point number between 0 and 1.
+ * @return a floating point number between 0 and 1.
+ */
@Override
public float getProgress() {
try {
@@ -174,52 +193,81 @@ final class YarnContainerManager
}
}
+ /**
+ * RM Callback: RM reports an error.
+ * @param throwable An exception thrown from RM.
+ */
@Override
public void onError(final Throwable throwable) {
- onRuntimeError(throwable);
+ this.onRuntimeError(throwable);
}
+ /**
+ * NM Callback: NM accepts the starting container request.
+ * @param containerId ID of a new container being started.
+ * @param stringByteBufferMap a Map between the auxiliary service names and
their outputs. Not used.
+ */
@Override
- public void onContainerStarted(
- final ContainerId containerId, final Map<String, ByteBuffer>
stringByteBufferMap) {
+ public void onContainerStarted(final ContainerId containerId, final
Map<String, ByteBuffer> stringByteBufferMap) {
final Optional<Container> container =
this.containers.getOptional(containerId.toString());
if (container.isPresent()) {
this.nodeManager.getContainerStatusAsync(containerId,
container.get().getNodeId());
}
}
+ /**
+ * NM Callback: NM reports container status.
+ * @param containerId ID of a container with the status being reported.
+ * @param containerStatus YARN container status.
+ */
@Override
- public void onContainerStatusReceived(
- final ContainerId containerId, final ContainerStatus containerStatus) {
+ public void onContainerStatusReceived(final ContainerId containerId, final
ContainerStatus containerStatus) {
onContainerStatus(containerStatus);
}
+ /**
+ * NM Callback: NM reports stop of a container.
+ * @param containerId ID of a container stopped.
+ */
@Override
public void onContainerStopped(final ContainerId containerId) {
final boolean hasContainer =
this.containers.hasContainer(containerId.toString());
if (hasContainer) {
- final ResourceStatusEventImpl.Builder resourceStatusBuilder =
-
ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString());
- resourceStatusBuilder.setState(State.DONE);
- this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
+ this.reefEventHandlers.onResourceStatus(
+ ResourceStatusEventImpl.newBuilder()
+ .setIdentifier(containerId.toString())
+ .setState(State.DONE)
+ .build());
}
}
+ /**
+ * NM Callback: NM reports failure on container start.
+ * @param containerId ID of a container that has failed to start.
+ * @param throwable An error that caused container to fail.
+ */
@Override
- public void onStartContainerError(
- final ContainerId containerId, final Throwable throwable) {
- handleContainerError(containerId, throwable);
+ public void onStartContainerError(final ContainerId containerId, final
Throwable throwable) {
+ this.handleContainerError(containerId, throwable);
}
+ /**
+ * NM Callback: NM can not obtain status of the container.
+ * @param containerId ID of a container that failed to report its status.
+ * @param throwable An error that occured when querying status of a
container.
+ */
@Override
- public void onGetContainerStatusError(
- final ContainerId containerId, final Throwable throwable) {
- handleContainerError(containerId, throwable);
+ public void onGetContainerStatusError(final ContainerId containerId, final
Throwable throwable) {
+ this.handleContainerError(containerId, throwable);
}
+ /**
+ * NM Callback: NM fails to stop the container.
+ * @param containerId ID of the container that failed to stop.
+ * @param throwable An error that occurred when trying to stop the container.
+ */
@Override
- public void onStopContainerError(
- final ContainerId containerId, final Throwable throwable) {
+ public void onStopContainerError(final ContainerId containerId, final
Throwable throwable) {
handleContainerError(containerId, throwable);
}
@@ -250,11 +298,17 @@ final class YarnContainerManager
updateRuntimeStatus();
}
+ /**
+ * Start the YARN container manager.
+ * This method is called from DriverRuntimeStartHandler via
YARNRuntimeStartHandler.
+ */
void onStart() {
this.yarnClient.start();
+
this.resourceManager.init(this.yarnConf);
this.resourceManager.start();
+
this.nodeManager.init(this.yarnConf);
this.nodeManager.start();
@@ -268,33 +322,45 @@ final class YarnContainerManager
}
try {
-
this.registration.setRegistration(this.resourceManager.registerApplicationMaster(
- "", 0, this.trackingURLProvider.getTrackingUrl()));
- LOG.log(Level.FINE, "YARN registration: {0}", registration);
+
+ this.registration.setRegistration(
+ this.resourceManager.registerApplicationMaster("", 0,
this.trackingURLProvider.getTrackingUrl()));
+
+ LOG.log(Level.FINE, "YARN registration: {0}", this.registration);
+
final FileSystem fs = FileSystem.get(this.yarnConf);
final Path outputFileName = new Path(this.jobSubmissionDirectory,
this.reefFileNames.getDriverHttpEndpoint());
- final FSDataOutputStream out = fs.create(outputFileName);
- out.writeBytes(this.trackingURLProvider.getTrackingUrl() + "\n");
- out.flush();
- out.close();
+
+ try (final FSDataOutputStream out = fs.create(outputFileName)) {
+ out.writeBytes(this.trackingURLProvider.getTrackingUrl() + '\n');
+ }
+
} catch (final YarnException | IOException e) {
LOG.log(Level.WARNING, "Unable to register application master.", e);
onRuntimeError(e);
}
}
+ /**
+ * Shut down YARN container manager.
+ * This method is called from DriverRuntimeStopHandler via
YARNRuntimeStopHandler.
+ * @param exception Exception that caused driver to stop. Can be null if
there was no error.
+ */
void onStop(final Throwable exception) {
LOG.log(Level.FINE, "Stop Runtime: RM status {0}",
this.resourceManager.getServiceState());
if (this.resourceManager.getServiceState() == Service.STATE.STARTED) {
+
// invariant: if RM is still running then we declare success.
try {
+
this.reefEventHandlers.close();
+
if (exception == null) {
- this.resourceManager.unregisterApplicationMaster(
- FinalApplicationStatus.SUCCEEDED, null, null);
+
this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
} else {
+
// Note: We don't allow RM to restart our applications if it's an
application level failure.
// If applications are to be long-running, they should catch
Exceptions before the REEF level
// instead of relying on the RM restart mechanism.
@@ -302,8 +368,8 @@ final class YarnContainerManager
// to leak to this stage.
final String failureMsg = String.format("Application failed due
to:%n%s%n" +
"With stack trace:%n%s", exception.getMessage(),
ExceptionUtils.getStackTrace(exception));
- this.resourceManager.unregisterApplicationMaster(
- FinalApplicationStatus.FAILED, failureMsg, null);
+
+
this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.FAILED,
failureMsg, null);
}
this.resourceManager.close();
@@ -325,7 +391,9 @@ final class YarnContainerManager
// HELPER METHODS
private void onNodeReport(final NodeReport nodeReport) {
+
LOG.log(Level.FINE, "Send node descriptor: {0}", nodeReport);
+
this.reefEventHandlers.onNodeDescriptor(NodeDescriptorEventImpl.newBuilder()
.setIdentifier(nodeReport.getNodeId().toString())
.setHostName(nodeReport.getNodeId().getHost())
@@ -337,19 +405,17 @@ final class YarnContainerManager
private void handleContainerError(final ContainerId containerId, final
Throwable throwable) {
- final ResourceStatusEventImpl.Builder resourceStatusBuilder =
-
ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString());
-
- resourceStatusBuilder.setState(State.FAILED);
- resourceStatusBuilder.setExitCode(1);
- resourceStatusBuilder.setDiagnostics(throwable.getMessage());
- this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
+
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
+ .setIdentifier(containerId.toString())
+ .setState(State.FAILED)
+ .setExitCode(1)
+ .setDiagnostics(throwable.getMessage())
+ .build());
}
/**
* Handles container status reports. Calls come from YARN.
- *
- * @param value containing the container status
+ * @param value containing the container status.
*/
private void onContainerStatus(final ContainerStatus value) {
@@ -387,7 +453,7 @@ final class YarnContainerManager
status.setDiagnostics(value.getDiagnostics());
}
- // The ResourceStatusHandler should close and release the Evaluator for
us if the state is a terminal state.
+ // ResourceStatusHandler should close and release the Evaluator for us
if the state is a terminal state.
this.reefEventHandlers.onResourceStatus(status.build());
}
}
@@ -397,7 +463,7 @@ final class YarnContainerManager
synchronized (this) {
this.containerRequestCounter.incrementBy(containerRequests.length);
this.requestsBeforeSentToRM.addAll(Arrays.asList(containerRequests));
- doHomogeneousRequests();
+ this.doHomogeneousRequests();
}
this.updateRuntimeStatus();
@@ -405,55 +471,59 @@ final class YarnContainerManager
/**
* Handles new container allocations. Calls come from YARN.
- *
- * @param container newly allocated
+ * @param container newly allocated YARN container.
*/
private void handleNewContainer(final Container container) {
LOG.log(Level.FINE, "allocated container: id[ {0} ]", container.getId());
+
synchronized (this) {
- if (matchContainerWithPendingRequest(container)) {
- final AMRMClient.ContainerRequest matchedRequest =
this.requestsAfterSentToRM.peek();
- this.containerRequestCounter.decrement();
- this.containers.add(container);
-
- LOG.log(Level.FINEST, "{0} matched with {1}", new
Object[]{container.toString(), matchedRequest.toString()});
-
- // Due to the bug YARN-314 and the workings of AMRMCClientAsync, when
x-priority m-capacity zero-container
- // request and x-priority n-capacity nonzero-container request are
sent together, where m > n, RM ignores
- // the latter.
- // Therefore it is necessary avoid sending zero-container request,
even it means getting extra containers.
- // It is okay to send nonzero m-capacity and n-capacity request
together since bigger containers
- // can be matched.
- // TODO[JIRA REEF-42, REEF-942]: revisit this when implementing
locality-strictness
- // (i.e. a specific rack request can be ignored)
- if (this.requestsAfterSentToRM.size() > 1) {
- try {
- this.resourceManager.removeContainerRequest(matchedRequest);
- } catch (final Exception e) {
- LOG.log(Level.WARNING, "Nothing to remove from Async AMRM client's
queue, " +
- "removal attempt failed with exception", e);
- }
- }
- this.requestsAfterSentToRM.remove();
- doHomogeneousRequests();
-
- LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number
= {1}",
- new Object[]{container.getResource().getMemory(),
container.getResource().getVirtualCores()});
-
this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder()
- .setIdentifier(container.getId().toString())
- .setNodeId(container.getNodeId().toString())
- .setResourceMemory(container.getResource().getMemory())
- .setVirtualCores(container.getResource().getVirtualCores())
- .setRackName(rackNameFormatter.getRackName(container))
- .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME)
- .build());
- this.updateRuntimeStatus();
- } else {
+ if (!matchContainerWithPendingRequest(container)) {
LOG.log(Level.WARNING, "Got an extra container {0} that doesn't match,
releasing...", container.getId());
this.resourceManager.releaseAssignedContainer(container.getId());
+ return;
}
+
+ final AMRMClient.ContainerRequest matchedRequest =
this.requestsAfterSentToRM.peek();
+
+ this.containerRequestCounter.decrement();
+ this.containers.add(container);
+
+ LOG.log(Level.FINEST, "{0} matched with {1}", new Object[] {container,
matchedRequest});
+
+ // Due to the bug YARN-314 and the workings of AMRMCClientAsync, when
x-priority m-capacity zero-container
+ // request and x-priority n-capacity nonzero-container request are sent
together, where m > n, RM ignores
+ // the latter.
+ // Therefore it is necessary avoid sending zero-container request, even
if it means getting extra containers.
+ // It is okay to send nonzero m-capacity and n-capacity request together
since bigger containers
+ // can be matched.
+ // TODO[JIRA REEF-42, REEF-942]: revisit this when implementing
locality-strictness.
+ // (i.e. a specific rack request can be ignored)
+ if (this.requestsAfterSentToRM.size() > 1) {
+ try {
+ this.resourceManager.removeContainerRequest(matchedRequest);
+ } catch (final Exception e) {
+ LOG.log(Level.WARNING, "Error removing request from Async AMRM
client queue: " + matchedRequest, e);
+ }
+ }
+
+ this.requestsAfterSentToRM.remove();
+ this.doHomogeneousRequests();
+
+ LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number =
{1}",
+ new Object[] {container.getResource().getMemory(),
container.getResource().getVirtualCores()});
+
+
this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder()
+ .setIdentifier(container.getId().toString())
+ .setNodeId(container.getNodeId().toString())
+ .setResourceMemory(container.getResource().getMemory())
+ .setVirtualCores(container.getResource().getVirtualCores())
+ .setRackName(rackNameFormatter.getRackName(container))
+ .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME)
+ .build());
+
+ this.updateRuntimeStatus();
}
}
@@ -484,15 +554,19 @@ final class YarnContainerManager
* up the allocation and in placing containers on other machines.
*/
private boolean matchContainerWithPendingRequest(final Container container) {
+
if (this.requestsAfterSentToRM.isEmpty()) {
return false;
}
final AMRMClient.ContainerRequest request =
this.requestsAfterSentToRM.peek();
+
final boolean resourceCondition = container.getResource().getMemory() >=
request.getCapability().getMemory();
+
// TODO[JIRA REEF-35]: check vcores once YARN-2380 is resolved
final boolean nodeCondition = request.getNodes() == null
|| request.getNodes().contains(container.getNodeId().getHost());
+
final boolean rackCondition = request.getRacks() == null
||
request.getRacks().contains(this.nodeIdToRackName.get(container.getNodeId().toString()));
@@ -504,11 +578,10 @@ final class YarnContainerManager
*/
private void updateRuntimeStatus() {
- final RuntimeStatusEventImpl.Builder builder =
- RuntimeStatusEventImpl.newBuilder()
- .setName(RUNTIME_NAME)
- .setState(State.RUNNING)
-
.setOutstandingContainerRequests(this.containerRequestCounter.get());
+ final RuntimeStatusEventImpl.Builder builder =
RuntimeStatusEventImpl.newBuilder()
+ .setName(RUNTIME_NAME)
+ .setState(State.RUNNING)
+ .setOutstandingContainerRequests(this.containerRequestCounter.get());
for (final String allocatedContainerId :
this.containers.getContainerIds()) {
builder.addContainerAllocation(allocatedContainerId);
@@ -522,26 +595,25 @@ final class YarnContainerManager
// SHUTDOWN YARN
try {
this.reefEventHandlers.close();
- this.resourceManager.unregisterApplicationMaster(
- FinalApplicationStatus.FAILED, throwable.getMessage(), null);
+
this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.FAILED,
throwable.getMessage(), null);
} catch (final Exception e) {
LOG.log(Level.WARNING, "Error shutting down YARN application", e);
} finally {
this.resourceManager.stop();
}
- final RuntimeStatusEventImpl.Builder runtimeStatusBuilder =
RuntimeStatusEventImpl.newBuilder()
- .setState(State.FAILED)
- .setName(RUNTIME_NAME);
-
- final Encoder<Throwable> codec = new ObjectSerializableCodec<>();
-
runtimeStatusBuilder.setError(ReefServiceProtos.RuntimeErrorProto.newBuilder()
- .setName(RUNTIME_NAME)
- .setMessage(throwable.getMessage())
- .setException(ByteString.copyFrom(codec.encode(throwable)))
- .build())
- .build();
+ final ReefServiceProtos.RuntimeErrorProto runtimeError =
+ ReefServiceProtos.RuntimeErrorProto.newBuilder()
+ .setName(RUNTIME_NAME)
+ .setMessage(throwable.getMessage())
+ .setException(ByteString.copyFrom(new
ObjectSerializableCodec<>().encode(throwable)))
+ .build();
- this.reefEventHandlers.onRuntimeStatus(runtimeStatusBuilder.build());
+ this.reefEventHandlers.onRuntimeStatus(
+ RuntimeStatusEventImpl.newBuilder()
+ .setState(State.FAILED)
+ .setName(RUNTIME_NAME)
+ .setError(runtimeError)
+ .build());
}
}