Repository: reef Updated Branches: refs/heads/master da8ce0258 -> 77546f8f3
[REEF-943] Decouple the DriverSide heartbeat handling from protobuf This patch: * Adds the 'pojos' package to `org.apache.reef.runtime.common.driver.evaluator` * Implements EvaluatorStatusPOJO, ContextStatusPOJO, TaskStatusPOJO, ContextMessagePOJO, TaskMessagePOJO. These classes turn ReefServiceProtos into corresponding POJOs. The POJO classes implement only the methods that DriverSide event handlers currently use; POJOs discard all other proto fields. * Implements ContextState and State enums that mimic corresponding enums from ReefServiceProtos. Note that the State enum represents both Tasks and Evaluator states in the original protobuf code. This patch adopts this policy. * Replaces ReefServiceProtos with corresponding POJOs and enums in all event handlers that EvaluatorManager#onEvaluatorHeartbeatMessage calls JIRA: [REEF-943](https://issues.apache.org/jira/browse/REEF-943) Pull Request: This closes #694 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/77546f8f Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/77546f8f Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/77546f8f Branch: refs/heads/master Commit: 77546f8f3c4fcd06efa7b8ad9597601a94532a99 Parents: da8ce02 Author: [email protected] <[email protected]> Authored: Mon Nov 23 11:05:14 2015 +0100 Committer: Markus Weimer <[email protected]> Committed: Tue Dec 15 10:02:06 2015 -0800 ---------------------------------------------------------------------- .../driver/context/ContextRepresenters.java | 82 +++++++------ .../common/driver/context/EvaluatorContext.java | 11 +- .../driver/evaluator/EvaluatorManager.java | 71 ++++++----- .../evaluator/pojos/ContextMessagePOJO.java | 55 +++++++++ .../driver/evaluator/pojos/ContextState.java | 34 ++++++ .../evaluator/pojos/ContextStatusPOJO.java | 122 +++++++++++++++++++ .../evaluator/pojos/EvaluatorStatusPOJO.java | 89 ++++++++++++++ .../common/driver/evaluator/pojos/State.java | 40 ++++++ .../driver/evaluator/pojos/TaskMessagePOJO.java | 54 ++++++++ .../driver/evaluator/pojos/TaskStatusPOJO.java | 118 ++++++++++++++++++ .../driver/evaluator/pojos/package-info.java | 36 ++++++ .../resourcemanager/ResourceManagerStatus.java | 22 ++-- .../resourcemanager/ResourceStatusEvent.java | 4 +- .../ResourceStatusEventImpl.java | 11 +- .../resourcemanager/RuntimeStatusEvent.java | 3 +- .../resourcemanager/RuntimeStatusEventImpl.java | 9 +- .../common/driver/task/TaskRepresenter.java | 88 ++++++------- .../runtime/local/driver/ResourceManager.java | 4 +- .../process/ReefRunnableProcessObserver.java | 8 +- .../runtime/mesos/driver/REEFScheduler.java | 2 +- .../yarn/driver/YarnContainerManager.java | 19 +-- .../driver/YarnDriverRuntimeRestartManager.java | 4 +- 22 files changed, 728 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java index a01c073..5a117f9 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java @@ -25,8 +25,10 @@ import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.context.FailedContext; import org.apache.reef.driver.restart.DriverRestartManager; import org.apache.reef.driver.restart.EvaluatorRestartState; -import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; +import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextMessagePOJO; +import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState; +import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; import org.apache.reef.util.Optional; import javax.inject.Inject; @@ -97,14 +99,14 @@ public final class ContextRepresenters { /** * Process heartbeats from the contexts on an Evaluator. * - * @param contextStatusProtos + * @param contextStatusPOJOs * @param notifyClientOnNewActiveContext */ - public synchronized void onContextStatusMessages(final Iterable<ReefServiceProtos.ContextStatusProto> - contextStatusProtos, + public synchronized void onContextStatusMessages(final Iterable<ContextStatusPOJO> + contextStatusPOJOs, final boolean notifyClientOnNewActiveContext) { - for (final ReefServiceProtos.ContextStatusProto contextStatusProto : contextStatusProtos) { - this.onContextStatusMessage(contextStatusProto, notifyClientOnNewActiveContext); + for (final ContextStatusPOJO contextStatus : contextStatusPOJOs) { + this.onContextStatusMessage(contextStatus, notifyClientOnNewActiveContext); } } @@ -112,53 +114,53 @@ public final class ContextRepresenters { /** * Process a heartbeat from a context. * - * @param contextStatusProto + * @param contextStatus * @param notifyClientOnNewActiveContext */ - private synchronized void onContextStatusMessage(final ReefServiceProtos.ContextStatusProto contextStatusProto, + private synchronized void onContextStatusMessage(final ContextStatusPOJO contextStatus, final boolean notifyClientOnNewActiveContext) { - LOG.log(Level.FINER, "Processing context status message for context {0}", contextStatusProto.getContextId()); - switch (contextStatusProto.getContextState()) { + LOG.log(Level.FINER, "Processing context status message for context {0}", contextStatus.getContextId()); + switch (contextStatus.getContextState()) { case READY: - this.onContextReady(contextStatusProto, notifyClientOnNewActiveContext); + this.onContextReady(contextStatus, notifyClientOnNewActiveContext); break; case FAIL: - this.onContextFailed(contextStatusProto); + this.onContextFailed(contextStatus); break; case DONE: - this.onContextDone(contextStatusProto); + this.onContextDone(contextStatus); break; default: - this.onUnknownContextStatus(contextStatusProto); + this.onUnknownContextStatus(contextStatus); break; } - LOG.log(Level.FINER, "Done processing context status message for context {0}", contextStatusProto.getContextId()); + LOG.log(Level.FINER, "Done processing context status message for context {0}", contextStatus.getContextId()); } - private synchronized void onUnknownContextStatus(final ReefServiceProtos.ContextStatusProto contextStatusProto) { - LOG.log(Level.WARNING, "Received unexpected context status: {0}", contextStatusProto); - throw new RuntimeException("Received unexpected context status: " + contextStatusProto.getContextState()); + private synchronized void onUnknownContextStatus(final ContextStatusPOJO contextStatus) { + LOG.log(Level.WARNING, "Received unexpected context status: {0}", contextStatus); + throw new RuntimeException("Received unexpected context status: " + contextStatus.getContextState()); } - private synchronized void onContextFailed(final ReefServiceProtos.ContextStatusProto contextStatusProto) { - assert ReefServiceProtos.ContextStatusProto.State.FAIL == contextStatusProto.getContextState(); - final String contextID = contextStatusProto.getContextId(); + private synchronized void onContextFailed(final ContextStatusPOJO contextStatus) { + assert ContextState.FAIL == contextStatus.getContextState(); + final String contextID = contextStatus.getContextId(); LOG.log(Level.FINE, "Context {0} failed", contextID); // It could have failed right away. if (this.isUnknownContextId(contextID)) { - this.onNewContext(contextStatusProto, false); + this.onNewContext(contextStatus, false); } final EvaluatorContext context = getContext(contextID); this.removeContext(context); - this.messageDispatcher.onContextFailed(context.getFailedContext(contextStatusProto)); + this.messageDispatcher.onContextFailed(context.getFailedContext(contextStatus)); } - private synchronized void onContextDone(final ReefServiceProtos.ContextStatusProto contextStatusProto) { - assert ReefServiceProtos.ContextStatusProto.State.DONE == contextStatusProto.getContextState(); - final String contextID = contextStatusProto.getContextId(); + private synchronized void onContextDone(final ContextStatusPOJO contextStatus) { + assert ContextState.DONE == contextStatus.getContextState(); + final String contextID = contextStatus.getContextId(); if (isUnknownContextId(contextID)) { throw new RuntimeException("Received DONE for context " + contextID + " which is unknown."); } else { @@ -178,24 +180,24 @@ public final class ContextRepresenters { /** * Process a message with status READY from a context. * - * @param contextStatusProto + * @param contextStatus * @param notifyClientOnNewActiveContext whether or not to inform the application when this in fact refers to a new * context. */ - private synchronized void onContextReady(final ReefServiceProtos.ContextStatusProto contextStatusProto, + private synchronized void onContextReady(final ContextStatusPOJO contextStatus, final boolean notifyClientOnNewActiveContext) { - assert ReefServiceProtos.ContextStatusProto.State.READY == contextStatusProto.getContextState(); - final String contextID = contextStatusProto.getContextId(); + assert ContextState.READY == contextStatus.getContextState(); + final String contextID = contextStatus.getContextId(); // This could be the first message we get from that context if (this.isUnknownContextId(contextID)) { - this.onNewContext(contextStatusProto, notifyClientOnNewActiveContext); + this.onNewContext(contextStatus, notifyClientOnNewActiveContext); } // Dispatch the messages to the application, if there are any. - for (final ReefServiceProtos.ContextStatusProto.ContextMessageProto - contextMessageProto : contextStatusProto.getContextMessageList()) { - final byte[] theMessage = contextMessageProto.getMessage().toByteArray(); - final String sourceID = contextMessageProto.getSourceId(); + for (final ContextMessagePOJO + contextMessage : contextStatus.getContextMessageList()) { + final byte[] theMessage = contextMessage.getMessage(); + final String sourceID = contextMessage.getSourceId(); this.messageDispatcher.onContextMessage(new ContextMessageImpl(theMessage, contextID, sourceID)); } @@ -204,16 +206,16 @@ public final class ContextRepresenters { /** * Create and add a new context representer. * - * @param contextStatusProto the message to create the context from + * @param contextStatus the message to create the context from * @param notifyClientOnNewActiveContext whether or not to fire an event to the user. */ - private synchronized void onNewContext(final ReefServiceProtos.ContextStatusProto contextStatusProto, + private synchronized void onNewContext(final ContextStatusPOJO contextStatus, final boolean notifyClientOnNewActiveContext) { - final String contextID = contextStatusProto.getContextId(); + final String contextID = contextStatus.getContextId(); LOG.log(Level.FINE, "Adding new context {0}.", contextID); - final Optional<String> parentID = contextStatusProto.hasParentId() ? - Optional.of(contextStatusProto.getParentId()) : Optional.<String>empty(); + final Optional<String> parentID = contextStatus.hasParentId() ? + Optional.of(contextStatus.getParentId()) : Optional.<String>empty(); final EvaluatorContext context = contextFactory.newContext(contextID, parentID); this.addContext(context); if (notifyClientOnNewActiveContext) { http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java index a25e116..8106713 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java @@ -26,8 +26,9 @@ import org.apache.reef.driver.context.ClosedContext; import org.apache.reef.driver.context.FailedContext; import org.apache.reef.driver.evaluator.EvaluatorDescriptor; import org.apache.reef.proto.EvaluatorRuntimeProtocol; -import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; +import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState; +import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; import org.apache.reef.runtime.common.utils.ExceptionCodec; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.formats.ConfigurationSerializer; @@ -248,15 +249,15 @@ public final class EvaluatorContext implements ActiveContext { } public synchronized FailedContext getFailedContext( - final ReefServiceProtos.ContextStatusProto contextStatusProto) { + final ContextStatusPOJO contextStatus) { - assert ReefServiceProtos.ContextStatusProto.State.FAIL == contextStatusProto.getContextState(); + assert ContextState.FAIL == contextStatus.getContextState(); final String id = this.getId(); final Optional<String> description = Optional.empty(); - final Optional<byte[]> data = contextStatusProto.hasError() ? - Optional.of(contextStatusProto.getError().toByteArray()) : + final Optional<byte[]> data = contextStatus.hasError() ? + Optional.of(contextStatus.getError()) : Optional.<byte[]>empty(); final Optional<Throwable> cause = data.isPresent() ? http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index 7a92dbc..cc107c3 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -24,6 +24,10 @@ import org.apache.reef.driver.evaluator.FailedEvaluator; import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders; import org.apache.reef.driver.restart.DriverRestartManager; import org.apache.reef.driver.restart.EvaluatorRestartState; +import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; +import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; +import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO; import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.driver.context.FailedContext; @@ -60,6 +64,7 @@ import org.apache.reef.wake.time.event.Alarm; import javax.inject.Inject; import java.io.File; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.logging.Level; @@ -190,9 +195,9 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { } private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent resourceStatusEvent) { - return resourceStatusEvent.getState() == ReefServiceProtos.State.DONE || - resourceStatusEvent.getState() == ReefServiceProtos.State.FAILED || - resourceStatusEvent.getState() == ReefServiceProtos.State.KILLED; + return resourceStatusEvent.getState() == State.DONE || + resourceStatusEvent.getState() == State.FAILED || + resourceStatusEvent.getState() == State.KILLED; } @Override @@ -313,7 +318,6 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { } else { this.messageDispatcher.onEvaluatorFailed(failedEvaluator); } - } catch (final Exception e) { LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e); } finally { @@ -372,17 +376,25 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { // Process the Evaluator status message if (evaluatorHeartbeatProto.hasEvaluatorStatus()) { - this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus()); + EvaluatorStatusPOJO evaluatorStatus = new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus()); + this.onEvaluatorStatusMessage(evaluatorStatus); } // Process the Context status message(s) final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus(); - this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(), + final List<ContextStatusPOJO> contextStatusList = new ArrayList<>(); + for (ReefServiceProtos.ContextStatusProto proto : evaluatorHeartbeatProto.getContextStatusList()) { + contextStatusList.add(new ContextStatusPOJO(proto)); + } + + this.contextRepresenters.onContextStatusMessages(contextStatusList, informClientOfNewContexts); // Process the Task status message + if (evaluatorHeartbeatProto.hasTaskStatus()) { - this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus()); + TaskStatusPOJO taskStatus = new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus()); + this.onTaskStatusMessage(taskStatus); } LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId()); } @@ -393,7 +405,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { * * @param message */ - private synchronized void onEvaluatorStatusMessage(final ReefServiceProtos.EvaluatorStatusProto message) { + private synchronized void onEvaluatorStatusMessage(final EvaluatorStatusPOJO message) { switch (message.getState()) { case DONE: @@ -417,8 +429,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { * * @param message */ - private synchronized void onEvaluatorDone(final ReefServiceProtos.EvaluatorStatusProto message) { - assert message.getState() == ReefServiceProtos.State.DONE; + private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) { + assert message.getState() == State.DONE; LOG.log(Level.FINEST, "Evaluator {0} done.", getId()); this.stateManager.setDone(); this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId)); @@ -428,14 +440,15 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { /** * Process an evaluator message that indicates a crash. * - * @param evaluatorStatusProto + * @param evaluatorStatus */ - private synchronized void onEvaluatorFailed(final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) { - assert evaluatorStatusProto.getState() == ReefServiceProtos.State.FAILED; + private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) { + assert evaluatorStatus.getState() + == State.FAILED; final EvaluatorException evaluatorException; - if (evaluatorStatusProto.hasError()) { + if (evaluatorStatus.hasError()) { final Optional<Throwable> exception = - this.exceptionCodec.fromBytes(evaluatorStatusProto.getError().toByteArray()); + this.exceptionCodec.fromBytes(evaluatorStatus.getError()); if (exception.isPresent()) { evaluatorException = new EvaluatorException(getId(), exception.get()); } else { @@ -486,40 +499,40 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { /** * Handle task status messages. * - * @param taskStatusProto message contains the current task status. + * @param taskStatus message contains the current task status. */ - private void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskStatusProto) { + private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) { - if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatusProto.getTaskId()))) { - if (taskStatusProto.getState() == ReefServiceProtos.State.INIT || - taskStatusProto.getState() == ReefServiceProtos.State.FAILED || - taskStatusProto.getState() == ReefServiceProtos.State.RUNNING || + if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) { + if (taskStatus.getState() == State.INIT || + taskStatus.getState() == State.FAILED || + taskStatus.getState() == State.RUNNING || driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.REREGISTERED) { // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order // [REEF-289] is a related item which may fix the issue - if (taskStatusProto.getState() == ReefServiceProtos.State.RUNNING) { + if (taskStatus.getState() == State.RUNNING) { LOG.log(Level.WARNING, "Received a message of state " + ReefServiceProtos.State.RUNNING + - " for Task " + taskStatusProto.getTaskId() + + " for Task " + taskStatus.getTaskId() + " before receiving its " + ReefServiceProtos.State.INIT + " state"); } // FAILED is a legal first state of a Task as it could have failed during construction. this.task = Optional.of( - new TaskRepresenter(taskStatusProto.getTaskId(), - this.contextRepresenters.getContext(taskStatusProto.getContextId()), + new TaskRepresenter(taskStatus.getTaskId(), + this.contextRepresenters.getContext(taskStatus.getContextId()), this.messageDispatcher, this, this.exceptionCodec, this.driverRestartManager)); } else { - throw new RuntimeException("Received a message of state " + taskStatusProto.getState() + - ", not INIT, RUNNING, or FAILED for Task " + taskStatusProto.getTaskId() + + throw new RuntimeException("Received a message of state " + taskStatus.getState() + + ", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() + " which we haven't heard from before."); } } - this.task.get().onTaskStatusMessage(taskStatusProto); + this.task.get().onTaskStatusMessage(taskStatus); if (this.task.get().isNotRunning()) { LOG.log(Level.FINEST, "Task no longer running. De-registering it."); @@ -563,7 +576,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { .append("] was running when the Evaluator crashed."); } - if (resourceStatusEvent.getState() == ReefServiceProtos.State.KILLED) { + if (resourceStatusEvent.getState() == State.KILLED) { this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString())); } else { http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java new file mode 100644 index 0000000..3760719 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextMessagePOJO.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.runtime.common.driver.evaluator.pojos; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.proto.ReefServiceProtos; + +/** + * DriverSide representation of ContextMessageProto. + */ +@DriverSide +@Private +public final class ContextMessagePOJO { + + private final byte[] message; + private final String sourceId; + + + ContextMessagePOJO(final ReefServiceProtos.ContextStatusProto.ContextMessageProto proto){ + message = proto.getMessage().toByteArray(); + sourceId = proto.getSourceId(); + } + + /** + * @return the serialized message from a context. + */ + public byte[] getMessage(){ + return message; + } + + /** + * @return the ID of the ContextMessageSource that sent the message. + */ + public String getSourceId(){ + return sourceId; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextState.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextState.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextState.java new file mode 100644 index 0000000..31efc62 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextState.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.runtime.common.driver.evaluator.pojos; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +/** + * DriverSide representation of ContextStatusProto.State. + */ +@DriverSide +@Private +public enum ContextState { + READY, + DONE, + FAIL; +} http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java new file mode 100644 index 0000000..a636ea5 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/ContextStatusPOJO.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.runtime.common.driver.evaluator.pojos; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.proto.ReefServiceProtos.ContextStatusProto.ContextMessageProto; + +import java.util.ArrayList; +import java.util.List; + +/** + * DriverSide representation of ContextStatusProto. + */ +@DriverSide +@Private +public final class ContextStatusPOJO { + + private final String contextId; + private final String parentId; + private final byte[] errorBytes; + private final ContextState contextState; + private final List<ContextMessagePOJO> contextMessages = new ArrayList<>(); + + + public ContextStatusPOJO(final ReefServiceProtos.ContextStatusProto proto){ + + contextId = proto.getContextId(); + parentId = proto.hasParentId() ? proto.getParentId() : null; + errorBytes = proto.hasError() ? proto.getError().toByteArray() : null; + contextState = proto.hasContextState()? getContextStateFromProto(proto.getContextState()) : null; + + for (final ContextMessageProto contextMessageProto : proto.getContextMessageList()) { + contextMessages.add(new ContextMessagePOJO(contextMessageProto)); + } + + } + + /** + * @return a list of messages sent by a context + */ + public List<ContextMessagePOJO> getContextMessageList() { + return contextMessages; + } + + /** + * @return the ID of a context + */ + public String getContextId() { + return contextId; + } + + /** + * @return the {@link org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState} of a context + */ + public ContextState getContextState() { + return contextState; + } + + /** + * @return true, if a context has thrown an exception and sent it to a driver + */ + public boolean hasError() { + return null != errorBytes; + } + + /** + * @return serialized exception thrown by a context + */ + public byte[] getError() { + return errorBytes; + } + + /** + * @return true, if a context has the parent context + */ + public boolean hasParentId() { + return null != parentId; + } + + /** + * @return the id of the parent context + */ + public String getParentId(){ + return parentId; + } + + private ContextState getContextStateFromProto( + final org.apache.reef.proto.ReefServiceProtos.ContextStatusProto.State protoState) { + + switch (protoState) { + case READY: + return ContextState.READY; + case DONE: + return ContextState.DONE; + case FAIL: + return ContextState.FAIL; + default: + throw new IllegalStateException("Unknown state " + protoState + " in ContextStatusProto"); + } + + } + +} http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java new file mode 100644 index 0000000..661369c --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.runtime.common.driver.evaluator.pojos; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.proto.ReefServiceProtos; + +/** + * DriverSide representation of EvaluatorStatusProto. + */ +@DriverSide +@Private +public final class EvaluatorStatusPOJO { + + private final String evaluatorID; + private final byte[] evaluatorIdBytes; + private final State evaluatorState; + private final byte[] errorBytes; + + + public EvaluatorStatusPOJO(final ReefServiceProtos.EvaluatorStatusProto proto) { + + evaluatorID = proto.getEvaluatorId(); + evaluatorIdBytes = proto.getEvaluatorIdBytes().toByteArray(); + evaluatorState = proto.hasState()? getStateFromProto(proto.getState()) : null; + errorBytes = proto.hasError() ? proto.getError().toByteArray() : null; + + } + + /** + * @return true, if an evaluator has thrown an exception and sent it to a driver + */ + public boolean hasError() { + return null != errorBytes; + } + + /** + * @return serialized exception thrown by an evaluator + */ + public byte[] getError(){ + return errorBytes; + } + + /** + * @return current {@link org.apache.reef.runtime.common.driver.evaluator.pojos.State} of a task + */ + public State getState(){ + return evaluatorState; + } + + private State getStateFromProto(final org.apache.reef.proto.ReefServiceProtos.State protoState) { + + switch (protoState) { + case INIT: + return State.INIT; + case RUNNING: + return State.RUNNING; + case DONE: + return State.DONE; + case SUSPEND: + return State.SUSPEND; + case FAILED: + return State.FAILED; + case KILLED: + return State.KILLED; + default: + throw new IllegalStateException("Unknown state " + protoState + " in EvaluatorStatusProto"); + } + } + +} http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java new file mode 100644 index 0000000..477564c --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.runtime.common.driver.evaluator.pojos; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +/** + * DriverSide representation of ReefServiceProtos.State. + */ + +@DriverSide +@Private +public enum State { + + INIT, + RUNNING, + DONE, + SUSPEND, + FAILED, + KILLED; + +} http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java new file mode 100644 index 0000000..84bb4d6 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskMessagePOJO.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.runtime.common.driver.evaluator.pojos; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.proto.ReefServiceProtos; + +/** + * DriverSide representation of TaskMessageProto. + */ +@DriverSide +@Private +public final class TaskMessagePOJO { + + private final byte[] message; + private final String sourceId; + + TaskMessagePOJO(final ReefServiceProtos.TaskStatusProto.TaskMessageProto proto){ + message = proto.getMessage().toByteArray(); + sourceId = proto.getSourceId(); + } + + /** + * @return the serialized message from a task + */ + public byte[] getMessage() { + return message; + } + + /** + * @return the ID of the TaskMessageSource that sent the message. + */ + public String getSourceId(){ + return sourceId; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java new file mode 100644 index 0000000..66ecdda --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.runtime.common.driver.evaluator.pojos; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.proto.ReefServiceProtos.TaskStatusProto.TaskMessageProto; + +import java.util.ArrayList; +import java.util.List; + +/** + * DriverSide representation of TaskStatusProto. + */ +@DriverSide +@Private +public final class TaskStatusPOJO { + + private final String taskId; + private final String contextId; + private final State state; + private final byte[] result; + private final List<TaskMessagePOJO> taskMessages = new ArrayList<>(); + + public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto){ + + taskId = proto.getTaskId(); + contextId = proto.getContextId(); + state = proto.hasState()? getStateFromProto(proto.getState()) : null; + result = proto.hasResult() ? proto.getResult().toByteArray() : null; + + for (final TaskMessageProto taskMessageProto : proto.getTaskMessageList()) { + taskMessages.add(new TaskMessagePOJO(taskMessageProto)); + } + + } + + /** + * @return a list of messages sent by a task + */ + public List<TaskMessagePOJO> getTaskMessageList(){ + return taskMessages; + } + + /** + * @return true, if a completed task returned a non-null value in the 'return' statement + */ + public boolean hasResult(){ + return null != result; + } + + /** + * @return serialized result that a completed task returned to the Driver + */ + public byte[] getResult(){ + return result; + } + + /** + * @return the id of a task + */ + public String getTaskId(){ + return taskId; + } + + /** + * @return the id of a context that this task runs within + */ + public String getContextId(){ + return contextId; + } + + /** + * @return current {@link org.apache.reef.runtime.common.driver.evaluator.pojos.State} of a task + */ + public State getState(){ + return state; + } + + private State getStateFromProto(final org.apache.reef.proto.ReefServiceProtos.State protoState) { + + switch (protoState) { + case INIT: + return State.INIT; + case RUNNING: + return State.RUNNING; + case DONE: + return State.DONE; + case SUSPEND: + return State.SUSPEND; + case FAILED: + return State.FAILED; + case KILLED: + return State.KILLED; + default: + throw new IllegalStateException("Unknown state " + protoState + " in EvaluatorStatusProto"); + } + + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/package-info.java new file mode 100644 index 0000000..d3d1369 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/package-info.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * POJOs in this package encapsulate the content of + * an EvaluatorHeartbeatProto on the DriverSide. + * + * POJOs keep only fields and interfaces that DriverSide actually uses; + * other EvaluatorHeartbeatProto members are discarded. + * + * See REEF-943. + */ + +@DriverSide +@Private +package org.apache.reef.runtime.common.driver.evaluator.pojos; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java index 68d7594..273649f 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java @@ -20,8 +20,8 @@ package org.apache.reef.runtime.common.driver.resourcemanager; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.DriverStatusManager; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; import org.apache.reef.runtime.common.driver.idle.DriverIdleManager; import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource; import org.apache.reef.runtime.common.driver.idle.IdleMessage; @@ -50,7 +50,7 @@ public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEv private final InjectionFuture<DriverIdleManager> driverIdleManager; // Mutable state. - private ReefServiceProtos.State state = ReefServiceProtos.State.INIT; + private State state = State.INIT; private int outstandingContainerRequests = 0; private int containerAllocationCount = 0; @@ -65,7 +65,7 @@ public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEv @Override public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent) { - final ReefServiceProtos.State newState = runtimeStatusEvent.getState(); + final State newState = runtimeStatusEvent.getState(); LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent); this.outstandingContainerRequests = runtimeStatusEvent.getOutstandingContainerRequests().orElse(0); this.containerAllocationCount = runtimeStatusEvent.getContainerAllocationList().size(); @@ -94,7 +94,7 @@ public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEv * Change the state of the Resource Manager to be RUNNING. */ public synchronized void setRunning() { - this.setState(ReefServiceProtos.State.RUNNING); + this.setState(State.RUNNING); } /** @@ -117,18 +117,18 @@ public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEv private synchronized void onRMFailure(final RuntimeStatusEvent runtimeStatusEvent) { - assert runtimeStatusEvent.getState() == ReefServiceProtos.State.FAILED; + assert runtimeStatusEvent.getState() == State.FAILED; this.resourceManagerErrorHandler.onNext(runtimeStatusEvent.getError().get()); } private synchronized void onRMDone(final RuntimeStatusEvent runtimeStatusEvent) { - assert runtimeStatusEvent.getState() == ReefServiceProtos.State.DONE; + assert runtimeStatusEvent.getState() == State.DONE; LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown."); this.driverStatusManager.onComplete(); } private synchronized void onRMRunning(final RuntimeStatusEvent runtimeStatusEvent) { - assert runtimeStatusEvent.getState() == ReefServiceProtos.State.RUNNING; + assert runtimeStatusEvent.getState() == State.RUNNING; if (this.isIdle()) { this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE); } @@ -141,7 +141,7 @@ public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEv } private synchronized boolean isRunning() { - return ReefServiceProtos.State.RUNNING.equals(this.state); + return State.RUNNING.equals(this.state); } /** @@ -155,8 +155,8 @@ public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEv * @return true if the transition is legal; false otherwise * */ - private synchronized boolean isLegalStateTransition(final ReefServiceProtos.State from, - final ReefServiceProtos.State to) { + private synchronized boolean isLegalStateTransition(final State from, + final State to) { // handle diagonal elements of the transition matrix if (from.equals(to)){ @@ -212,7 +212,7 @@ public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEv } - private synchronized void setState(final ReefServiceProtos.State newState) { + private synchronized void setState(final State newState) { if (isLegalStateTransition(this.state, newState)) { this.state = newState; http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java index c669d95..82945e6 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java @@ -20,7 +20,7 @@ package org.apache.reef.runtime.common.driver.resourcemanager; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.RuntimeAuthor; -import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.util.Optional; @@ -45,7 +45,7 @@ public interface ResourceStatusEvent { /** * @return State of the resource */ - ReefServiceProtos.State getState(); + State getState(); /** * @return Diagnostics from the resource http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java index ee01879..4a9da3d 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java @@ -18,7 +18,7 @@ */ package org.apache.reef.runtime.common.driver.resourcemanager; -import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; import org.apache.reef.util.BuilderUtils; import org.apache.reef.util.Optional; @@ -28,7 +28,7 @@ import org.apache.reef.util.Optional; */ public final class ResourceStatusEventImpl implements ResourceStatusEvent { private final String identifier; - private final ReefServiceProtos.State state; + private final State state; private final Optional<String> diagnostics; private final Optional<Integer> exitCode; private final String runtimeName; @@ -52,7 +52,7 @@ public final class ResourceStatusEventImpl implements ResourceStatusEvent { } @Override - public ReefServiceProtos.State getState() { + public State getState() { return state; } @@ -74,9 +74,10 @@ public final class ResourceStatusEventImpl implements ResourceStatusEvent { * Builder used to create ResourceStatusEvent instances. */ public static final class Builder implements org.apache.reef.util.Builder<ResourceStatusEvent> { + private String identifier; private String runtimeName; - private ReefServiceProtos.State state; + private State state; private String diagnostics; private Integer exitCode; @@ -98,7 +99,7 @@ public final class ResourceStatusEventImpl implements ResourceStatusEvent { /** * @see ResourceStatusEvent#getState() */ - public Builder setState(final ReefServiceProtos.State state) { + public Builder setState(final State state) { this.state = state; return this; } http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java index 18bcce0..ac02dba 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java @@ -21,6 +21,7 @@ package org.apache.reef.runtime.common.driver.resourcemanager; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.RuntimeAuthor; import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.util.Optional; @@ -42,7 +43,7 @@ public interface RuntimeStatusEvent { /** * @return State of the Runtime */ - ReefServiceProtos.State getState(); + State getState(); /** * @return List of allocated containers http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java index 50af802..48c9ed3 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java @@ -19,6 +19,7 @@ package org.apache.reef.runtime.common.driver.resourcemanager; import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; import org.apache.reef.util.BuilderUtils; import org.apache.reef.util.Optional; @@ -31,7 +32,7 @@ import java.util.List; */ public final class RuntimeStatusEventImpl implements RuntimeStatusEvent { private final String name; - private final ReefServiceProtos.State state; + private final State state; private final List<String> containerAllocationList; private final Optional<ReefServiceProtos.RuntimeErrorProto> error; private final Optional<Integer> outstandingContainerRequests; @@ -50,7 +51,7 @@ public final class RuntimeStatusEventImpl implements RuntimeStatusEvent { } @Override - public ReefServiceProtos.State getState() { + public State getState() { return state; } @@ -78,7 +79,7 @@ public final class RuntimeStatusEventImpl implements RuntimeStatusEvent { */ public static final class Builder implements org.apache.reef.util.Builder<RuntimeStatusEvent> { private String name; - private ReefServiceProtos.State state; + private State state; private List<String> containerAllocationList = new ArrayList<>(); private ReefServiceProtos.RuntimeErrorProto error; private Integer outstandingContainerRequests; @@ -94,7 +95,7 @@ public final class RuntimeStatusEventImpl implements RuntimeStatusEvent { /** * @see RuntimeStatusEvent#getState() */ - public Builder setState(final ReefServiceProtos.State state) { + public Builder setState(final State state) { this.state = state; return this; } http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java index 281df11..89d60a3 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java @@ -25,10 +25,12 @@ import org.apache.reef.driver.restart.DriverRestartManager; import org.apache.reef.driver.restart.EvaluatorRestartState; import org.apache.reef.driver.task.FailedTask; import org.apache.reef.driver.task.RunningTask; -import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.context.EvaluatorContext; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; +import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskMessagePOJO; +import org.apache.reef.runtime.common.driver.evaluator.pojos.TaskStatusPOJO; import org.apache.reef.runtime.common.utils.ExceptionCodec; import org.apache.reef.util.Optional; @@ -52,7 +54,7 @@ public final class TaskRepresenter { private final DriverRestartManager driverRestartManager; // Mutable state - private ReefServiceProtos.State state = ReefServiceProtos.State.INIT; + private State state = State.INIT; public TaskRepresenter(final String taskId, final EvaluatorContext context, @@ -68,57 +70,57 @@ public final class TaskRepresenter { this.driverRestartManager = driverRestartManager; } - private static byte[] getResult(final ReefServiceProtos.TaskStatusProto taskStatusProto) { - return taskStatusProto.hasResult() ? taskStatusProto.getResult().toByteArray() : null; + private static byte[] getResult(final TaskStatusPOJO taskStatus) { + return taskStatus.hasResult() ? taskStatus.getResult() : null; } - public void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskStatusProto) { + public void onTaskStatusMessage(final TaskStatusPOJO taskStatus) { LOG.log(Level.FINE, "Received task {0} status {1}", - new Object[]{taskStatusProto.getTaskId(), taskStatusProto.getState()}); + new Object[]{taskStatus.getTaskId(), taskStatus.getState()}); // Make sure that the message is indeed for us. - if (!taskStatusProto.getContextId().equals(this.context.getId())) { + if (!taskStatus.getContextId().equals(this.context.getId())) { throw new RuntimeException( - "Received a message for a task running on Context " + taskStatusProto.getContextId() + + "Received a message for a task running on Context " + taskStatus.getContextId() + " while the Driver believes this Task to be run on Context " + this.context.getId()); } - if (!taskStatusProto.getTaskId().equals(this.taskId)) { - throw new RuntimeException("Received a message for task " + taskStatusProto.getTaskId() + + if (!taskStatus.getTaskId().equals(this.taskId)) { + throw new RuntimeException("Received a message for task " + taskStatus.getTaskId() + " in the TaskRepresenter for Task " + this.taskId); } if (driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == EvaluatorRestartState.REREGISTERED) { // when a recovered heartbeat is received, we will take its word for it LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.", - new Object[]{taskStatusProto.getState(), this.taskId}); - this.setState(taskStatusProto.getState()); + new Object[]{taskStatus.getState(), this.taskId}); + this.setState(taskStatus.getState()); } // Dispatch the message to the right method. - switch (taskStatusProto.getState()) { + switch (taskStatus.getState()) { case INIT: - this.onTaskInit(taskStatusProto); + this.onTaskInit(taskStatus); break; case RUNNING: - this.onTaskRunning(taskStatusProto); + this.onTaskRunning(taskStatus); break; case SUSPEND: - this.onTaskSuspend(taskStatusProto); + this.onTaskSuspend(taskStatus); break; case DONE: - this.onTaskDone(taskStatusProto); + this.onTaskDone(taskStatus); break; case FAILED: - this.onTaskFailed(taskStatusProto); + this.onTaskFailed(taskStatus); break; default: - throw new IllegalStateException("Unknown task state: " + taskStatusProto.getState()); + throw new IllegalStateException("Unknown task state: " + taskStatus.getState()); } } - private void onTaskInit(final ReefServiceProtos.TaskStatusProto taskStatusProto) { - assert ReefServiceProtos.State.INIT == taskStatusProto.getState(); + private void onTaskInit(final TaskStatusPOJO taskStatusPOJO) { + assert State.INIT == taskStatusPOJO.getState(); if (this.isKnown()) { LOG.log(Level.WARNING, "Received a INIT message for task with id {0}" + " which we have seen before. Ignoring the second message", this.taskId); @@ -126,12 +128,12 @@ public final class TaskRepresenter { final RunningTask runningTask = new RunningTaskImpl( this.evaluatorManager, this.taskId, this.context, this); this.messageDispatcher.onTaskRunning(runningTask); - this.setState(ReefServiceProtos.State.RUNNING); + this.setState(State.RUNNING); } } - private void onTaskRunning(final ReefServiceProtos.TaskStatusProto taskStatusProto) { - assert taskStatusProto.getState() == ReefServiceProtos.State.RUNNING; + private void onTaskRunning(final TaskStatusPOJO taskStatus) { + assert taskStatus.getState() == State.RUNNING; if (this.isNotRunning()) { throw new IllegalStateException("Received a task status message from task " + this.taskId + @@ -146,41 +148,41 @@ public final class TaskRepresenter { this.messageDispatcher.onDriverRestartTaskRunning(runningTask); } - for (final ReefServiceProtos.TaskStatusProto.TaskMessageProto - taskMessageProto : taskStatusProto.getTaskMessageList()) { + for (final TaskMessagePOJO + taskMessageProto : taskStatus.getTaskMessageList()) { this.messageDispatcher.onTaskMessage( - new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(), + new TaskMessageImpl(taskMessageProto.getMessage(), this.taskId, this.context.getId(), taskMessageProto.getSourceId())); } } - private void onTaskSuspend(final ReefServiceProtos.TaskStatusProto taskStatusProto) { - assert ReefServiceProtos.State.SUSPEND == taskStatusProto.getState(); + private void onTaskSuspend(final TaskStatusPOJO taskStatus) { + assert State.SUSPEND == taskStatus.getState(); assert this.isKnown(); this.messageDispatcher.onTaskSuspended( - new SuspendedTaskImpl(this.context, getResult(taskStatusProto), this.taskId)); - this.setState(ReefServiceProtos.State.SUSPEND); + new SuspendedTaskImpl(this.context, getResult(taskStatus), this.taskId)); + this.setState(State.SUSPEND); } - private void onTaskDone(final ReefServiceProtos.TaskStatusProto taskStatusProto) { - assert ReefServiceProtos.State.DONE == taskStatusProto.getState(); + private void onTaskDone(final TaskStatusPOJO taskStatus) { + assert State.DONE == taskStatus.getState(); assert this.isKnown(); this.messageDispatcher.onTaskCompleted( - new CompletedTaskImpl(this.context, getResult(taskStatusProto), this.taskId)); - this.setState(ReefServiceProtos.State.DONE); + new CompletedTaskImpl(this.context, getResult(taskStatus), this.taskId)); + this.setState(State.DONE); } - private void onTaskFailed(final ReefServiceProtos.TaskStatusProto taskStatusProto) { - assert ReefServiceProtos.State.FAILED == taskStatusProto.getState(); + private void onTaskFailed(final TaskStatusPOJO taskStatus) { + assert State.FAILED == taskStatus.getState(); final Optional<ActiveContext> evaluatorContext = Optional.<ActiveContext>of(this.context); - final Optional<byte[]> bytes = Optional.ofNullable(getResult(taskStatusProto)); + final Optional<byte[]> bytes = Optional.ofNullable(getResult(taskStatus)); final Optional<Throwable> exception = this.exceptionCodec.fromBytes(bytes); final String message = exception.isPresent() ? exception.get().getMessage() : "No message given"; final Optional<String> description = Optional.empty(); final FailedTask failedTask = new FailedTask( this.taskId, message, description, exception, bytes, evaluatorContext); this.messageDispatcher.onTaskFailed(failedTask); - this.setState(ReefServiceProtos.State.FAILED); + this.setState(State.FAILED); } public String getId() { @@ -191,24 +193,24 @@ public final class TaskRepresenter { * @return true, if we had at least one message from the task. */ private boolean isKnown() { - return this.state != ReefServiceProtos.State.INIT; + return this.state != State.INIT; } /** * @return true, if this task is in any other state but RUNNING. */ public boolean isNotRunning() { - return this.state != ReefServiceProtos.State.RUNNING; + return this.state != State.RUNNING; } /** * @return true, if this task is in INIT or RUNNING status. */ public boolean isClosable() { - return this.state == ReefServiceProtos.State.INIT || this.state == ReefServiceProtos.State.RUNNING; + return this.state == State.INIT || this.state == State.RUNNING; } - private void setState(final ReefServiceProtos.State newState) { + private void setState(final State newState) { LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]", new Object[]{this.taskId, this.state, newState}); this.state = newState; http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java index 76d107a..fa3e91d 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java @@ -21,11 +21,11 @@ package org.apache.reef.runtime.local.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.evaluator.EvaluatorProcess; -import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; import org.apache.reef.runtime.common.driver.api.RuntimeParameters; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent; @@ -244,7 +244,7 @@ public final class ResourceManager { final RuntimeStatusEventImpl.Builder builder = RuntimeStatusEventImpl.newBuilder() .setName("LOCAL") - .setState(ReefServiceProtos.State.RUNNING) + .setState(State.RUNNING) .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests()); for (final String containerAllocation : this.theContainers.getAllocatedContainerIDs()) { builder.addContainerAllocation(containerAllocation); http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java index 25fda94..8e2d72b 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java @@ -19,8 +19,8 @@ package org.apache.reef.runtime.local.process; import net.jcip.annotations.ThreadSafe; -import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.api.RuntimeParameters; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; import org.apache.reef.runtime.local.driver.ResourceManager; @@ -58,7 +58,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve this.onResourceStatus( ResourceStatusEventImpl.newBuilder() .setIdentifier(processId) - .setState(ReefServiceProtos.State.RUNNING) + .setState(State.RUNNING) .build() ); } @@ -87,7 +87,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve this.onResourceStatus( ResourceStatusEventImpl.newBuilder() .setIdentifier(processId) - .setState(ReefServiceProtos.State.DONE) + .setState(State.DONE) .setExitCode(0) .build() ); @@ -103,7 +103,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve this.onResourceStatus( ResourceStatusEventImpl.newBuilder() .setIdentifier(processId) - .setState(ReefServiceProtos.State.FAILED) + .setState(State.FAILED) .setExitCode(exitCode) .build() ); http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java index 9817f9b..7387d3c 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java @@ -21,10 +21,10 @@ package org.apache.reef.runtime.mesos.driver; import com.google.protobuf.ByteString; import org.apache.mesos.MesosSchedulerDriver; import org.apache.reef.proto.ReefServiceProtos; -import org.apache.reef.proto.ReefServiceProtos.State; import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java index fc03393..f38b53a 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.reef.driver.ProgressProvider; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.DriverStatusManager; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; @@ -149,7 +150,7 @@ final class YarnContainerManager @Override public void onShutdownRequest() { this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder() - .setName(RUNTIME_NAME).setState(ReefServiceProtos.State.DONE).build()); + .setName(RUNTIME_NAME).setState(State.DONE).build()); this.driverStatusManager.onError(new Exception("Shutdown requested by YARN.")); } @@ -199,7 +200,7 @@ final class YarnContainerManager if (hasContainer) { final ResourceStatusEventImpl.Builder resourceStatusBuilder = ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString()); - resourceStatusBuilder.setState(ReefServiceProtos.State.DONE); + resourceStatusBuilder.setState(State.DONE); this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build()); } } @@ -339,7 +340,7 @@ final class YarnContainerManager final ResourceStatusEventImpl.Builder resourceStatusBuilder = ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString()); - resourceStatusBuilder.setState(ReefServiceProtos.State.FAILED); + resourceStatusBuilder.setState(State.FAILED); resourceStatusBuilder.setExitCode(1); resourceStatusBuilder.setDiagnostics(throwable.getMessage()); this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build()); @@ -366,19 +367,19 @@ final class YarnContainerManager LOG.log(Level.FINE, "Container completed: status {0}", value.getExitStatus()); switch (value.getExitStatus()) { case 0: - status.setState(ReefServiceProtos.State.DONE); + status.setState(State.DONE); break; case 143: - status.setState(ReefServiceProtos.State.KILLED); + status.setState(State.KILLED); break; default: - status.setState(ReefServiceProtos.State.FAILED); + status.setState(State.FAILED); } status.setExitCode(value.getExitStatus()); break; default: LOG.info("Container running"); - status.setState(ReefServiceProtos.State.RUNNING); + status.setState(State.RUNNING); } if (value.getDiagnostics() != null) { @@ -506,7 +507,7 @@ final class YarnContainerManager final RuntimeStatusEventImpl.Builder builder = RuntimeStatusEventImpl.newBuilder() .setName(RUNTIME_NAME) - .setState(ReefServiceProtos.State.RUNNING) + .setState(State.RUNNING) .setOutstandingContainerRequests(this.containerRequestCounter.get()); for (final String allocatedContainerId : this.containers.getContainerIds()) { @@ -530,7 +531,7 @@ final class YarnContainerManager } final RuntimeStatusEventImpl.Builder runtimeStatusBuilder = RuntimeStatusEventImpl.newBuilder() - .setState(ReefServiceProtos.State.FAILED) + .setState(State.FAILED) .setName(RUNTIME_NAME); final Encoder<Throwable> codec = new ObjectSerializableCodec<>(); http://git-wip-us.apache.org/repos/asf/reef/blob/77546f8f/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java index d2a00f4..429e74a 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java @@ -30,8 +30,8 @@ import org.apache.reef.annotations.audience.RuntimeAuthor; import org.apache.reef.driver.restart.DriverRuntimeRestartManager; import org.apache.reef.driver.restart.EvaluatorRestartInfo; import org.apache.reef.driver.restart.RestartEvaluators; -import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.EvaluatorPreserver; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver; @@ -256,7 +256,7 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta // trigger a failed evaluator event this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder() .setIdentifier(evaluatorId) - .setState(ReefServiceProtos.State.FAILED) + .setState(State.FAILED) .setExitCode(1) .setDiagnostics("Container [" + evaluatorId + "] failed during driver restart process.") .build());
