Repository: reef Updated Branches: refs/heads/master a4e7294d7 -> 234eeba7d
[REEF-1639] Refactor RunnableProcess and move State logic into a separate enum JIRA: [REEF-1639](https://issues.apache.org/jira/browse/REEF-1639) Pull request: This closes #1154 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/234eeba7 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/234eeba7 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/234eeba7 Branch: refs/heads/master Commit: 234eeba7de648d5f1dbed4246733f60df143bca7 Parents: a4e7294 Author: Sergiy Matusevych <[email protected]> Authored: Wed Oct 12 14:35:56 2016 -0700 Committer: Mariia Mykhailova <[email protected]> Committed: Fri Oct 14 12:11:04 2016 -0700 ---------------------------------------------------------------------- .../runtime/local/process/RunnableProcess.java | 168 ++++++++----------- .../local/process/RunnableProcessState.java | 68 ++++++++ 2 files changed, 137 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/234eeba7/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java index 12b4488..d545b3f 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java @@ -23,7 +23,7 @@ import org.apache.reef.util.OSUtils; import java.io.*; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -32,7 +32,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; - /** * A runnable class that encapsulates a process. */ @@ -46,6 +45,7 @@ public final class RunnableProcess implements Runnable { * Name of the file used for STDERR redirection. */ private final String standardErrorFileName; + /** * Name of the file used for STDOUT redirection. */ @@ -55,102 +55,83 @@ public final class RunnableProcess implements Runnable { * Command to execute. */ private final List<String> command; + /** * User supplied ID of this process. */ private final String id; + /** * The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited. */ private final File folder; + /** * The coarse-grained lock for state transition. */ private final Lock stateLock = new ReentrantLock(); + private final Condition doneCond = stateLock.newCondition(); + /** * This will be informed of process start and stop. */ private final RunnableProcessObserver processObserver; + /** * The process. */ private Process process; + /** * The state of the process. */ - private State state = State.INIT; // synchronized on stateLock + private RunnableProcessState state = RunnableProcessState.INIT; // synchronized on stateLock /** - * @param command the command to execute. - * @param id The ID of the process. This is used to name files and in the logs created - * by this process. - * @param folder The folder in which this will store its stdout and stderr output - * @param processObserver will be informed of process state changes. - * @param standardOutFileName The name of the file used for redirecting STDOUT + * @param command the command to execute. + * @param id The ID of the process. This is used to name files and in the logs created by this process. + * @param folder The folder in which this will store its stdout and stderr output + * @param processObserver will be informed of process state changes. + * @param standardOutFileName The name of the file used for redirecting STDOUT * @param standardErrorFileName The name of the file used for redirecting STDERR */ - public RunnableProcess(final List<String> command, - final String id, - final File folder, - final RunnableProcessObserver processObserver, - final String standardOutFileName, - final String standardErrorFileName) { + public RunnableProcess( + final List<String> command, + final String id, + final File folder, + final RunnableProcessObserver processObserver, + final String standardOutFileName, + final String standardErrorFileName) { + this.processObserver = processObserver; - this.command = new ArrayList<>(command); + this.command = Collections.unmodifiableList(command); this.id = id; this.folder = folder; + assert this.folder.isDirectory(); if (!this.folder.exists() && !this.folder.mkdirs()) { LOG.log(Level.WARNING, "Failed to create [{0}]", this.folder.getAbsolutePath()); } + this.standardOutFileName = standardOutFileName; this.standardErrorFileName = standardErrorFileName; - LOG.log(Level.FINEST, "RunnableProcess ready."); - } - /** - * Checks whether a transition from State 'from' to state 'to' is legal. - * - * @param from - * @param to - * @return true, if the state transition is legal. False otherwise. - */ - private static boolean isLegal(final State from, final State to) { - switch (from) { - case INIT: - switch (to) { - case INIT: - case RUNNING: - case ENDED: - return true; - default: - return false; - } - case RUNNING: - switch (to) { - case ENDED: - return true; - default: - return false; - } - case ENDED: - return false; - default: - return false; - } + LOG.log(Level.FINEST, "RunnableProcess ready."); } /** * Runs the configured process. - * - * @throws java.lang.IllegalStateException if the process is already running or has been running before. + * @throws IllegalStateException if the process is already running or has been running before. */ @Override public void run() { + this.stateLock.lock(); + try { - if (this.getState() != State.INIT) { + + if (this.state != RunnableProcessState.INIT) { throw new IllegalStateException("The RunnableProcess can't be reused"); } @@ -160,57 +141,72 @@ public final class RunnableProcess implements Runnable { // Launch the process try { - LOG.log(Level.FINEST, "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}", - new Object[]{this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()}); + + LOG.log(Level.FINEST, + "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}", + new Object[] {this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()}); + this.process = new ProcessBuilder() .command(this.command) .directory(this.folder) .redirectError(errFile) .redirectOutput(outFile) .start(); - this.setState(State.RUNNING); + + this.setState(RunnableProcessState.RUNNING); this.processObserver.onProcessStarted(this.id); + } catch (final IOException ex) { - LOG.log(Level.SEVERE, "Unable to spawn process \"{0}\" wth command {1}\n Exception:{2}", - new Object[]{this.id, this.command, ex}); + LOG.log(Level.SEVERE, + "Unable to spawn process \"{0}\" wth command {1}\n Exception:{2}", + new Object[] {this.id, this.command, ex}); } + } finally { this.stateLock.unlock(); } try { + // Wait for its completion - final int returnValue = process.waitFor(); + LOG.log(Level.FINER, "Wait for process completion: {0}", this.id); + final int returnValue = this.process.waitFor(); this.processObserver.onProcessExit(this.id, returnValue); + this.stateLock.lock(); try { - this.setState(State.ENDED); + this.setState(RunnableProcessState.ENDED); this.doneCond.signalAll(); } finally { this.stateLock.unlock(); } - LOG.log(Level.FINEST, "Process \"{0}\" returned {1}", new Object[]{this.id, returnValue}); + + LOG.log(Level.FINER, "Process \"{0}\" returned {1}", new Object[] {this.id, returnValue}); + } catch (final InterruptedException ex) { - LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}", - new Object[]{this.id, ex}); + LOG.log(Level.SEVERE, + "Interrupted while waiting for the process \"{0}\" to complete. Exception: {1}", + new Object[] {this.id, ex}); } } - /** * Cancels the running process if it is running. */ public void cancel() { + this.stateLock.lock(); + try { - if (this.processIsRunning()) { + + if (this.state == RunnableProcessState.RUNNING) { this.process.destroy(); if (!this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS)) { LOG.log(Level.FINE, "{0} milliseconds elapsed", DESTROY_WAIT_TIME); } } - if (this.processIsRunning()) { + if (this.state == RunnableProcessState.RUNNING) { LOG.log(Level.WARNING, "The child process survived Process.destroy()"); if (OSUtils.isUnix() || OSUtils.isWindows()) { LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line"); @@ -224,8 +220,9 @@ public final class RunnableProcess implements Runnable { } } catch (final InterruptedException ex) { - LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}", - new Object[]{this.id, ex}); + LOG.log(Level.SEVERE, + "Interrupted while waiting for the process \"{0}\" to complete. Exception: {1}", + new Object[] {this.id, ex}); } finally { this.stateLock.unlock(); } @@ -237,27 +234,13 @@ public final class RunnableProcess implements Runnable { */ private long readPID() throws IOException { final String pidFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME; - try (final BufferedReader r = - new BufferedReader(new InputStreamReader(new FileInputStream(pidFileName), StandardCharsets.UTF_8))) { + try (final BufferedReader r = new BufferedReader( + new InputStreamReader(new FileInputStream(pidFileName), StandardCharsets.UTF_8))) { return Long.parseLong(r.readLine()); } } /** - * @return a boolean that indicates if the process is running. - */ - private boolean processIsRunning() { - return this.getState() == State.RUNNING; - } - - /** - * @return the current State of the process. - */ - private State getState() { - return this.state; - } - - /** * @return the ID of the process. */ public String getId() { @@ -273,26 +256,13 @@ public final class RunnableProcess implements Runnable { /** * Sets a new state for the process. - * - * @param newState - * @throws java.lang.IllegalStateException if the new state is illegal. + * @param newState a new process state to transition to. + * @throws IllegalStateException if the new state is illegal. */ - private void setState(final State newState) { - if (!isLegal(this.state, newState)) { + private void setState(final RunnableProcessState newState) { + if (!this.state.isLegal(newState)) { throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal"); } this.state = newState; } - - /** - * The possible states of a process: INIT, RUNNING, ENDED. - */ - private enum State { - // After initialization - INIT, - // The process is running - RUNNING, - // The process ended - ENDED - } } http://git-wip-us.apache.org/repos/asf/reef/blob/234eeba7/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessState.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessState.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessState.java new file mode 100644 index 0000000..ed239fc --- /dev/null +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessState.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.local.process; + +/** + * The possible states of a process: INIT, RUNNING, ENDED. + */ +enum RunnableProcessState { + + /** After initialization. */ + INIT, + + /** The process is running. */ + RUNNING, + + /** The process ended. */ + ENDED; + + /** + * Check whether a transition from current state to the given one is legal. + * @param toState destination state. + * @return True if the state transition is legal, false otherwise. + */ + public boolean isLegal(final RunnableProcessState toState) { + + switch (this) { + + case INIT: + switch (toState) { + case RUNNING: + case ENDED: + return true; + default: + return false; + } + + case RUNNING: + switch (toState) { + case ENDED: + return true; + default: + return false; + } + + case ENDED: + return false; + + default: + return false; + } + } +}
