http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java new file mode 100644 index 0000000..9e9e7ac --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Execute a long-lived process. + * + * <p> + * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing + * a short lived application; this class allows for the process to run for the + * life of the Java process that forked it. + * It is designed to be embedded inside a YARN service, though this is not + * the sole way that it can be used + * <p> + * Key Features: + * <ol> + * <li>Output is streamed to the output logger provided</li>. + * <li>the input stream is closed as soon as the process starts.</li> + * <li>The most recent lines of output are saved to a linked list</li>. + * <li>A synchronous callback, {@link LongLivedProcessLifecycleEvent}, + * is raised on the start and finish of a process.</li> + * </ol> + * + */ +public class LongLivedProcess implements Runnable { + /** + * Limit on number of lines to retain in the "recent" line list:{@value} + */ + public static final int RECENT_LINE_LOG_LIMIT = 64; + + /** + * Const defining the time in millis between polling for new text. + */ + private static final int STREAM_READER_SLEEP_TIME = 200; + + /** + * limit on the length of a stream before it triggers an automatic newline. + */ + private static final int LINE_LENGTH = 256; + private final ProcessBuilder processBuilder; + private Process process; + private Integer exitCode = null; + private final String name; + private final ExecutorService processExecutor; + private final ExecutorService logExecutor; + + private ProcessStreamReader processStreamReader; + //list of recent lines, recorded for extraction into reports + private final List<String> recentLines = new LinkedList<>(); + private int recentLineLimit = RECENT_LINE_LOG_LIMIT; + private LongLivedProcessLifecycleEvent lifecycleCallback; + private final AtomicBoolean finalOutputProcessed = new AtomicBoolean(false); + + /** + * Log supplied in the constructor for the spawned process -accessible + * to inner classes + */ + private Logger processLog; + + /** + * Class log -accessible to inner classes + */ + private static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class); + + /** + * flag to indicate that the process is done + */ + private final AtomicBoolean finished = new AtomicBoolean(false); + + /** + * Create an instance + * @param name process name + * @param processLog log for output (or null) + * @param commands command list + */ + public LongLivedProcess(String name, + Logger processLog, + List<String> commands) { + Preconditions.checkArgument(commands != null, "commands"); + + this.name = name; + this.processLog = processLog; + ServiceThreadFactory factory = new ServiceThreadFactory(name, true); + processExecutor = Executors.newSingleThreadExecutor(factory); + logExecutor = Executors.newSingleThreadExecutor(factory); + processBuilder = new ProcessBuilder(commands); + processBuilder.redirectErrorStream(false); + } + + /** + * Set the limit on recent lines to retain + * @param recentLineLimit size of rolling list of recent lines. + */ + public void setRecentLineLimit(int recentLineLimit) { + this.recentLineLimit = recentLineLimit; + } + + /** + * Set an optional application exit callback + * @param lifecycleCallback callback to notify on application exit + */ + public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) { + this.lifecycleCallback = lifecycleCallback; + } + + /** + * Add an entry to the environment + * @param envVar envVar -must not be null + * @param val value + */ + public void setEnv(String envVar, String val) { + Preconditions.checkArgument(envVar != null, "envVar"); + Preconditions.checkArgument(val != null, "val"); + processBuilder.environment().put(envVar, val); + } + + /** + * Bulk set the environment from a map. This does + * not replace the existing environment, just extend it/overwrite single + * entries. + * @param map map to add + */ + public void putEnvMap(Map<String, String> map) { + for (Map.Entry<String, String> entry : map.entrySet()) { + String val = entry.getValue(); + String key = entry.getKey(); + setEnv(key, val); + } + } + + /** + * Get the process environment + * @param variable environment variable + * @return the value or null if there is no match + */ + public String getEnv(String variable) { + return processBuilder.environment().get(variable); + } + + /** + * Set the process log. Ignored once the process starts + * @param processLog new log ... may be null + */ + public void setProcessLog(Logger processLog) { + this.processLog = processLog; + } + + /** + * Get the process reference + * @return the process -null if the process is not started + */ + public Process getProcess() { + return process; + } + + /** + * Get the process builder -this can be manipulated + * up to the start() operation. As there is no synchronization + * around it, it must only be used in the same thread setting up the commmand. + * @return the process builder + */ + public ProcessBuilder getProcessBuilder() { + return processBuilder; + } + + /** + * Get the command list + * @return the comands + */ + public List<String> getCommands() { + return processBuilder.command(); + } + + public String getCommand() { + return getCommands().get(0); + } + + /** + * probe to see if the process is running + * @return true iff the process has been started and is not yet finished + */ + public boolean isRunning() { + return process != null && !finished.get(); + } + + /** + * Get the exit code: null until the process has finished + * @return the exit code or null + */ + public Integer getExitCode() { + return exitCode; + } + + /** + * Get the exit code sign corrected: null until the process has finished + * @return the exit code or null + */ + public Integer getExitCodeSignCorrected() { + Integer result; + if (exitCode != null) { + result = (exitCode << 24) >> 24; + } else { + result = null; + } + return result; + } + + /** + * Stop the process if it is running. + * This will trigger an application completion event with the given exit code + */ + public void stop() { + if (!isRunning()) { + return; + } + process.destroy(); + } + + /** + * Get a text description of the builder suitable for log output + * @return a multiline string + */ + protected String describeBuilder() { + StringBuilder buffer = new StringBuilder(); + for (String arg : processBuilder.command()) { + buffer.append('"').append(arg).append("\" "); + } + return buffer.toString(); + } + + /** + * Dump the environment to a string builder + * @param buffer the buffer to append to + */ + public void dumpEnv(StringBuilder buffer) { + buffer.append("\nEnvironment\n-----------"); + Map<String, String> env = processBuilder.environment(); + Set<String> keys = env.keySet(); + List<String> sortedKeys = new ArrayList<String>(keys); + Collections.sort(sortedKeys); + for (String key : sortedKeys) { + buffer.append(key).append("=").append(env.get(key)).append('\n'); + } + } + + /** + * Exec the process + * @return the process + * @throws IOException on aany failure to start the process + * @throws FileNotFoundException if the process could not be found + */ + private Process spawnChildProcess() throws IOException { + if (process != null) { + throw new IOException("Process already started"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Spawning process:\n " + describeBuilder()); + } + try { + process = processBuilder.start(); + } catch (IOException e) { + // on windows, upconvert DOS error 2 from ::CreateProcess() + // to its real meaning: FileNotFound + if (e.toString().contains("CreateProcess error=2")) { + FileNotFoundException fnfe = + new FileNotFoundException(e.toString()); + fnfe.initCause(e); + throw fnfe; + } else { + throw e; + } + } + return process; + } + + /** + * Entry point for waiting for the program to finish + */ + @Override // Runnable + public void run() { + Preconditions.checkNotNull(process, "null process"); + LOG.debug("Lifecycle callback thread running"); + //notify the callback that the process has started + if (lifecycleCallback != null) { + lifecycleCallback.onProcessStarted(this); + } + try { + //close stdin for the process + IOUtils.closeStream(process.getOutputStream()); + exitCode = process.waitFor(); + } catch (InterruptedException e) { + LOG.debug("Process wait interrupted -exiting thread", e); + } finally { + //here the process has finished + LOG.debug("process {} has finished", name); + //tell the logger it has to finish too + finished.set(true); + + // shut down the threads + logExecutor.shutdown(); + try { + logExecutor.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + //ignored + } + + //now call the callback if it is set + if (lifecycleCallback != null) { + lifecycleCallback.onProcessExited(this, exitCode, + getExitCodeSignCorrected()); + } + } + } + + /** + * Spawn the application + * @throws IOException IO problems + */ + public void start() throws IOException { + + spawnChildProcess(); + processStreamReader = + new ProcessStreamReader(processLog, STREAM_READER_SLEEP_TIME); + logExecutor.submit(processStreamReader); + processExecutor.submit(this); + } + + /** + * Get the lines of recent output + * @return the last few lines of output; an empty list if there are none + * or the process is not actually running + */ + public synchronized List<String> getRecentOutput() { + return new ArrayList<String>(recentLines); + } + + /** + * @return whether lines of recent output are empty + */ + public synchronized boolean isRecentOutputEmpty() { + return recentLines.isEmpty(); + } + + /** + * Query to see if the final output has been processed + * @return + */ + public boolean isFinalOutputProcessed() { + return finalOutputProcessed.get(); + } + + /** + * Get the recent output from the process, or [] if not defined + * + * @param finalOutput flag to indicate "wait for the final output of the process" + * @param duration the duration, in ms, + * ro wait for recent output to become non-empty + * @return a possibly empty list + */ + public List<String> getRecentOutput(boolean finalOutput, int duration) { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start <= duration) { + boolean finishedOutput; + if (finalOutput) { + // final flag means block until all data is done + finishedOutput = isFinalOutputProcessed(); + } else { + // there is some output + finishedOutput = !isRecentOutputEmpty(); + } + if (finishedOutput) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + return getRecentOutput(); + } + + /** + * add the recent line to the list of recent lines; deleting + * an earlier on if the limit is reached. + * + * Implementation note: yes, a circular array would be more + * efficient, especially with some power of two as the modulo, + * but is it worth the complexity and risk of errors for + * something that is only called once per line of IO? + * @param line line to record + * @param isErrorStream is the line from the error stream + * @param logger logger to log to - null for no logging + */ + private synchronized void recordRecentLine(String line, + boolean isErrorStream, + Logger logger) { + if (line == null) { + return; + } + String entry = (isErrorStream ? "[ERR] " : "[OUT] ") + line; + recentLines.add(entry); + if (recentLines.size() > recentLineLimit) { + recentLines.remove(0); + } + if (logger != null) { + if (isErrorStream) { + logger.warn(line); + } else { + logger.info(line); + } + } + } + + /** + * Class to read data from the two process streams, and, when run in a thread + * to keep running until the <code>done</code> flag is set. + * Lines are fetched from stdout and stderr and logged at info and error + * respectively. + */ + + private class ProcessStreamReader implements Runnable { + private final Logger streamLog; + private final int sleepTime; + + /** + * Create an instance + * @param streamLog log -or null to disable logging (recent entries + * will still be retained) + * @param sleepTime time to sleep when stopping + */ + private ProcessStreamReader(Logger streamLog, int sleepTime) { + this.streamLog = streamLog; + this.sleepTime = sleepTime; + } + + /** + * Return a character if there is one, -1 if nothing is ready yet + * @param reader reader + * @return the value from the reader, or -1 if it is not ready + * @throws IOException IO problems + */ + private int readCharNonBlocking(BufferedReader reader) throws IOException { + if (reader.ready()) { + return reader.read(); + } else { + return -1; + } + } + + /** + * Read in a line, or, if the limit has been reached, the buffer + * so far + * @param reader source of data + * @param line line to build + * @param limit limit of line length + * @return true if the line can be printed + * @throws IOException IO trouble + */ + @SuppressWarnings("NestedAssignment") + private boolean readAnyLine(BufferedReader reader, + StringBuilder line, + int limit) + throws IOException { + int next; + while ((-1 != (next = readCharNonBlocking(reader)))) { + if (next != '\n') { + line.append((char) next); + limit--; + if (line.length() > limit) { + //enough has been read in to print it any + return true; + } + } else { + //line end return flag to say so + return true; + } + } + //here the end of the stream is hit, or the limit + return false; + } + + + @Override //Runnable + @SuppressWarnings("IOResourceOpenedButNotSafelyClosed") + public void run() { + BufferedReader errReader = null; + BufferedReader outReader = null; + StringBuilder outLine = new StringBuilder(LINE_LENGTH); + StringBuilder errorLine = new StringBuilder(LINE_LENGTH); + try { + errReader = new BufferedReader( + new InputStreamReader(process.getErrorStream())); + outReader = new BufferedReader( + new InputStreamReader(process.getInputStream())); + while (!finished.get()) { + boolean processed = false; + if (readAnyLine(errReader, errorLine, LINE_LENGTH)) { + recordRecentLine(errorLine.toString(), true, streamLog); + errorLine.setLength(0); + processed = true; + } + if (readAnyLine(outReader, outLine, LINE_LENGTH)) { + recordRecentLine(outLine.toString(), false, streamLog); + outLine.setLength(0); + processed |= true; + } + if (!processed && !finished.get()) { + //nothing processed: wait a bit for data. + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + //ignore this, rely on the done flag + LOG.debug("Ignoring ", e); + } + } + } + // finished: cleanup + + //print the current error line then stream through the rest + recordFinalOutput(errReader, errorLine, true, streamLog); + //now do the info line + recordFinalOutput(outReader, outLine, false, streamLog); + + } catch (Exception ignored) { + LOG.warn("encountered {}", ignored, ignored); + //process connection has been torn down + } finally { + // close streams + IOUtils.closeStream(errReader); + IOUtils.closeStream(outReader); + //mark output as done + finalOutputProcessed.set(true); + } + } + + /** + * Record the final output of a process stream + * @param reader reader of output + * @param lineBuilder string builder into which line is built + * @param isErrorStream flag to indicate whether or not this is the + * is the line from the error stream + * @param logger logger to log to + * @throws IOException + */ + protected void recordFinalOutput(BufferedReader reader, + StringBuilder lineBuilder, boolean isErrorStream, Logger logger) throws + IOException { + String line = lineBuilder.toString(); + recordRecentLine(line, isErrorStream, logger); + line = reader.readLine(); + while (line != null) { + recordRecentLine(line, isErrorStream, logger); + line = reader.readLine(); + if (Thread.interrupted()) { + break; + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java new file mode 100644 index 0000000..a13b508 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +/** + * Callback when a long-lived application exits + */ +public interface LongLivedProcessLifecycleEvent { + + /** + * Callback when a process is started + * @param process the process invoking the callback + */ + void onProcessStarted(LongLivedProcess process); + + /** + * Callback when a process has finished + * @param process the process invoking the callback + * @param exitCode exit code from the process + * @param signCorrectedCode the code- as sign corrected + */ + void onProcessExited(LongLivedProcess process, + int exitCode, + int signCorrectedCode); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java new file mode 100644 index 0000000..a123584 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import org.apache.hadoop.service.Service; + +import java.util.List; + +/** + * Interface for accessing services that contain one or more child + * services. + */ +public interface ServiceParent extends Service { + + /** + * Add a child service. It must be in a consistent state with the + * service to which it is being added. + * @param service the service to add. + */ + void addService(Service service); + + /** + * Get an unmodifiable list of services + * @return a list of child services at the time of invocation - + * added services will not be picked up. + */ + List<Service> getServices(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java new file mode 100644 index 0000000..5ebf77c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.service.Service; + +import java.util.concurrent.Callable; + +/** + * A runnable which terminates its owner; it also catches any + * exception raised and can serve it back. + * + */ +public class ServiceTerminatingCallable<V> implements Callable<V> { + + private final Service owner; + private Exception exception; + /** + * This is the callback + */ + private final Callable<V> callable; + + + /** + * Create an instance. If the owner is null, the owning service + * is not terminated. + * @param owner owning service -can be null + * @param callable callback. + */ + public ServiceTerminatingCallable(Service owner, + Callable<V> callable) { + Preconditions.checkArgument(callable != null, "null callable"); + this.owner = owner; + this.callable = callable; + } + + + /** + * Get the owning service + * @return the service to receive notification when + * the runnable completes. + */ + public Service getOwner() { + return owner; + } + + /** + * Any exception raised by inner <code>action's</code> run. + * @return an exception or null. + */ + public Exception getException() { + return exception; + } + + /** + * Delegates the call to the callable supplied in the constructor, + * then calls the stop() operation on its owner. Any exception + * is caught, noted and rethrown + * @return the outcome of the delegated call operation + * @throws Exception if one was raised. + */ + @Override + public V call() throws Exception { + try { + return callable.call(); + } catch (Exception e) { + exception = e; + throw e; + } finally { + if (owner != null) { + owner.stop(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java new file mode 100644 index 0000000..dc591df --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.service.Service; + +/** + * A runnable which terminates its after running; it also catches any + * exception raised and can serve it back. + */ +public class ServiceTerminatingRunnable implements Runnable { + + private final Service owner; + private final Runnable action; + private Exception exception; + + /** + * Create an instance. + * @param owner owning service + * @param action action to execute before terminating the service + */ + public ServiceTerminatingRunnable(Service owner, Runnable action) { + Preconditions.checkArgument(owner != null, "null owner"); + Preconditions.checkArgument(action != null, "null action"); + this.owner = owner; + this.action = action; + } + + /** + * Get the owning service. + * @return the service to receive notification when + * the runnable completes. + */ + public Service getOwner() { + return owner; + } + + /** + * Any exception raised by inner <code>action's</code> run. + * @return an exception or null. + */ + public Exception getException() { + return exception; + } + + @Override + public void run() { + try { + action.run(); + } catch (Exception e) { + exception = e; + } + owner.stop(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java new file mode 100644 index 0000000..737197b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import com.google.common.base.Preconditions; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A thread factory that creates threads (possibly daemon threads) + * using the name and naming policy supplied. + * The thread counter starts at 1, increments atomically, + * and is supplied as the second argument in the format string. + * + * A static method, {@link #singleThreadExecutor(String, boolean)}, + * exists to simplify the construction of an executor with a single well-named + * threads. + * + * Example + * <pre> + * ExecutorService exec = ServiceThreadFactory.newSingleThreadExecutor("live", true) + * </pre> + */ +public class ServiceThreadFactory implements ThreadFactory { + + private static final AtomicInteger counter = new AtomicInteger(1); + + /** + * Default format for thread names: {@value}. + */ + public static final String DEFAULT_NAMING_FORMAT = "%s-%03d"; + private final String name; + private final boolean daemons; + private final String namingFormat; + + /** + * Create an instance + * @param name base thread name + * @param daemons flag to indicate the threads should be marked as daemons + * @param namingFormat format string to generate thread names from + */ + public ServiceThreadFactory(String name, + boolean daemons, + String namingFormat) { + Preconditions.checkArgument(name != null, "null name"); + Preconditions.checkArgument(namingFormat != null, "null naming format"); + this.name = name; + this.daemons = daemons; + this.namingFormat = namingFormat; + } + + /** + * Create an instance with the default naming format. + * @param name base thread name + * @param daemons flag to indicate the threads should be marked as daemons + */ + public ServiceThreadFactory(String name, + boolean daemons) { + this(name, daemons, DEFAULT_NAMING_FORMAT); + } + + @Override + public Thread newThread(Runnable r) { + Preconditions.checkArgument(r != null, "null runnable"); + String threadName = + String.format(namingFormat, name, counter.getAndIncrement()); + Thread thread = new Thread(r, threadName); + thread.setDaemon(daemons); + return thread; + } + + /** + * Create a single thread executor using this naming policy. + * @param name base thread name + * @param daemons flag to indicate the threads should be marked as daemons + * @return an executor + */ + public static ExecutorService singleThreadExecutor(String name, + boolean daemons) { + return Executors.newSingleThreadExecutor( + new ServiceThreadFactory(name, daemons)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java new file mode 100644 index 0000000..65d14b7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A service that calls the supplied callback when it is started -after the + * given delay. + * + * It can be configured to stop itself after the callback has + * completed, marking any exception raised as the exception of this service. + * The notifications come in on a callback thread -a thread that is only + * started in this service's <code>start()</code> operation. + */ +public class WorkflowCallbackService<V> extends + WorkflowScheduledExecutorService<ScheduledExecutorService> { + protected static final Logger LOG = + LoggerFactory.getLogger(WorkflowCallbackService.class); + + /** + * This is the callback. + */ + private final Callable<V> callback; + private final int delay; + private final ServiceTerminatingCallable<V> command; + + private ScheduledFuture<V> scheduledFuture; + + /** + * Create an instance of the service + * @param name service name + * @param callback callback to invoke + * @param delay delay -or 0 for no delay + * @param terminate terminate this service after the callback? + */ + public WorkflowCallbackService(String name, + Callable<V> callback, + int delay, + boolean terminate) { + super(name); + Preconditions.checkNotNull(callback, "Null callback argument"); + this.callback = callback; + this.delay = delay; + command = new ServiceTerminatingCallable<V>( + terminate ? this : null, + callback); + } + + public ScheduledFuture<V> getScheduledFuture() { + return scheduledFuture; + } + + @Override + protected void serviceStart() throws Exception { + LOG.debug("Notifying {} after a delay of {} millis", callback, delay); + ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor( + new ServiceThreadFactory(getName(), true)); + setExecutor(executorService); + scheduledFuture = + executorService.schedule(command, delay, TimeUnit.MILLISECONDS); + } + + /** + * Stop the service. + * If there is any exception noted from any executed notification, + * note the exception in this class + * @throws Exception exception. + */ + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + // propagate any failure + if (getCallbackException() != null) { + throw getCallbackException(); + } + } + + /** + * Get the exception raised by a callback. Will always be null if the + * callback has not been executed; will only be non-null after any success. + * @return a callback + */ + public Exception getCallbackException() { + return command.getException(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java new file mode 100644 index 0000000..9c653f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * An extended composite service which stops itself if any child service + * fails, or when all its children have successfully stopped without failure. + * + * Lifecycle + * <ol> + * <li>If any child exits with a failure: this service stops, propagating + * the exception.</li> + * <li>When all child services has stopped, this service stops itself</li> + * </ol> + * + */ +public class WorkflowCompositeService extends CompositeService + implements ServiceParent, ServiceStateChangeListener { + + private static final Logger LOG = + LoggerFactory.getLogger(WorkflowCompositeService.class); + + /** + * Deadlock-avoiding overridden config for slider services; see SLIDER-1052 + */ + private volatile Configuration configuration; + + /** + * Construct an instance + * @param name name of this service instance + */ + public WorkflowCompositeService(String name) { + super(name); + } + + @Override + public Configuration getConfig() { + return configuration; + } + + @Override + protected void setConfig(Configuration conf) { + super.setConfig(conf); + configuration = conf; + } + + /** + * Construct an instance with the default name. + */ + public WorkflowCompositeService() { + this("WorkflowCompositeService"); + } + + /** + * Varargs constructor + * @param name name of this service instance + * @param children children + */ + public WorkflowCompositeService(String name, Service... children) { + this(name); + for (Service child : children) { + addService(child); + } + } + + /** + * Construct with a list of children + * @param name name of this service instance + * @param children children to add + */ + public WorkflowCompositeService(String name, List<Service> children) { + this(name); + for (Service child : children) { + addService(child); + } + } + + /** + * Add a service, and register it + * @param service the {@link Service} to be added. + * Important: do not add a service to a parent during your own serviceInit/start, + * in Hadoop 2.2; you will trigger a ConcurrentModificationException. + */ + @Override + public synchronized void addService(Service service) { + Preconditions.checkArgument(service != null, "null service argument"); + service.registerServiceListener(this); + super.addService(service); + } + + /** + * When this service is started, any service stopping with a failure + * exception is converted immediately into a failure of this service, + * storing the failure and stopping ourselves. + * @param child the service that has changed. + */ + @Override + public void stateChanged(Service child) { + //if that child stopped while we are running: + if (isInState(STATE.STARTED) && child.isInState(STATE.STOPPED)) { + // a child service has stopped + //did the child fail? if so: propagate + Throwable failureCause = child.getFailureCause(); + if (failureCause != null) { + LOG.info("Child service " + child + " failed", failureCause); + //failure. Convert to an exception + Exception e = (failureCause instanceof Exception) ? + (Exception) failureCause : new Exception(failureCause); + //flip ourselves into the failed state + noteFailure(e); + stop(); + } else { + LOG.info("Child service completed {}", child); + if (areAllChildrenStopped()) { + LOG.info("All children are halted: stopping"); + stop(); + } + } + } + } + + /** + * Probe to query if all children are stopped -simply + * by taking a snapshot of the child service list and enumerating + * their state. + * The state of the children may change during this operation -that will + * not get picked up. + * @return true if all the children are stopped. + */ + private boolean areAllChildrenStopped() { + List<Service> children = getServices(); + boolean stopped = true; + for (Service child : children) { + if (!child.isInState(STATE.STOPPED)) { + stopped = false; + break; + } + } + return stopped; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java new file mode 100644 index 0000000..7409d32 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.service.AbstractService; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** + * A service that hosts an executor -when the service is stopped, + * {@link ExecutorService#shutdownNow()} is invoked. + */ +public class WorkflowExecutorService<E extends ExecutorService> extends AbstractService { + + private E executor; + + /** + * Construct an instance with the given name -but + * no executor + * @param name service name + */ + public WorkflowExecutorService(String name) { + this(name, null); + } + + /** + * Construct an instance with the given name and executor + * @param name service name + * @param executor exectuor + */ + public WorkflowExecutorService(String name, + E executor) { + super(name); + this.executor = executor; + } + + /** + * Get the executor + * @return the executor + */ + public synchronized E getExecutor() { + return executor; + } + + /** + * Set the executor. Only valid if the current one is null + * @param executor executor + */ + public synchronized void setExecutor(E executor) { + Preconditions.checkState(this.executor == null, + "Executor already set"); + this.executor = executor; + } + + /** + * Execute the runnable with the executor (which + * must have been created already) + * @param runnable runnable to execute + */ + public void execute(Runnable runnable) { + getExecutor().execute(runnable); + } + + /** + * Submit a callable + * @param callable callable + * @param <V> type of the final get + * @return a future to wait on + */ + public <V> Future<V> submit(Callable<V> callable) { + return getExecutor().submit(callable); + } + + /** + * Stop the service: halt the executor. + * @throws Exception exception. + */ + @Override + protected void serviceStop() throws Exception { + stopExecutor(); + super.serviceStop(); + } + + /** + * Stop the executor if it is not null. + * This uses {@link ExecutorService#shutdownNow()} + * and so does not block until they have completed. + */ + protected synchronized void stopExecutor() { + if (executor != null) { + executor.shutdownNow(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java new file mode 100644 index 0000000..b71530f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.AbstractService; + +import java.net.InetSocketAddress; + +/** + * A YARN service that maps the start/stop lifecycle of an RPC server + * to the YARN service lifecycle. + */ +public class WorkflowRpcService extends AbstractService { + + /** RPC server*/ + private final Server server; + + /** + * Construct an instance + * @param name service name + * @param server service to stop + */ + public WorkflowRpcService(String name, Server server) { + super(name); + Preconditions.checkArgument(server != null, "Null server"); + this.server = server; + } + + /** + * Get the server + * @return the server + */ + public Server getServer() { + return server; + } + + /** + * Get the socket address of this server + * @return the address this server is listening on + */ + public InetSocketAddress getConnectAddress() { + return NetUtils.getConnectAddress(server); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + server.start(); + } + + @Override + protected void serviceStop() throws Exception { + if (server != null) { + server.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java new file mode 100644 index 0000000..e9f53ed --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * Scheduled executor or subclass thereof + * @param <E> scheduled executor service type + */ +public class WorkflowScheduledExecutorService<E extends ScheduledExecutorService> + extends WorkflowExecutorService<E> { + + public WorkflowScheduledExecutorService(String name) { + super(name); + } + + public WorkflowScheduledExecutorService(String name, + E executor) { + super(name, executor); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java new file mode 100644 index 0000000..97f97e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.hadoop.service.ServiceStateException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * This resembles the YARN CompositeService, except that it + * starts one service after another + * + * Workflow + * <ol> + * <li>When the <code>WorkflowSequenceService</code> instance is + * initialized, it only initializes itself.</li> + * + * <li>When the <code>WorkflowSequenceService</code> instance is + * started, it initializes then starts the first of its children. + * If there are no children, it immediately stops.</li> + * + * <li>When the active child stops, it did not fail, and the parent has not + * stopped -then the next service is initialized and started. If there is no + * remaining child the parent service stops.</li> + * + * <li>If the active child did fail, the parent service notes the exception + * and stops -effectively propagating up the failure. + * </li> + * </ol> + * + * New service instances MAY be added to a running instance -but no guarantees + * can be made as to whether or not they will be run. + */ + +public class WorkflowSequenceService extends AbstractService implements + ServiceParent, ServiceStateChangeListener { + + private static final Logger LOG = + LoggerFactory.getLogger(WorkflowSequenceService.class); + + /** + * list of services + */ + private final List<Service> serviceList = new ArrayList<>(); + + /** + * The currently active service. + * Volatile -may change & so should be read into a + * local variable before working with + */ + private volatile Service activeService; + + /** + the previous service -the last one that finished. + null if one did not finish yet + */ + private volatile Service previousService; + + private boolean stopIfNoChildServicesAtStartup = true; + + /** + * Construct an instance + * @param name service name + */ + public WorkflowSequenceService(String name) { + super(name); + } + + /** + * Construct an instance with the default name + */ + public WorkflowSequenceService() { + this("WorkflowSequenceService"); + } + + /** + * Create a service sequence with the given list of services + * @param name service name + * @param children initial sequence + */ + public WorkflowSequenceService(String name, Service... children) { + super(name); + for (Service service : children) { + addService(service); + } + } /** + * Create a service sequence with the given list of services + * @param name service name + * @param children initial sequence + */ + public WorkflowSequenceService(String name, List<Service> children) { + super(name); + for (Service service : children) { + addService(service); + } + } + + /** + * Get the current service -which may be null + * @return service running + */ + public Service getActiveService() { + return activeService; + } + + /** + * Get the previously active service + * @return the service last run, or null if there is none. + */ + public Service getPreviousService() { + return previousService; + } + + protected void setStopIfNoChildServicesAtStartup(boolean stopIfNoChildServicesAtStartup) { + this.stopIfNoChildServicesAtStartup = stopIfNoChildServicesAtStartup; + } + + /** + * When started + * @throws Exception + */ + @Override + protected void serviceStart() throws Exception { + if (!startNextService() && stopIfNoChildServicesAtStartup) { + //nothing to start -so stop + stop(); + } + } + + @Override + protected void serviceStop() throws Exception { + //stop current service. + //this triggers a callback that is caught and ignored + Service current = activeService; + previousService = current; + activeService = null; + if (current != null) { + current.stop(); + } + } + + /** + * Start the next service in the list. + * Return false if there are no more services to run, or this + * service has stopped + * @return true if a service was started + * @throws RuntimeException from any init or start failure + * @throws ServiceStateException if this call is made before + * the service is started + */ + public synchronized boolean startNextService() { + if (isInState(STATE.STOPPED)) { + //downgrade to a failed + LOG.debug("Not starting next service -{} is stopped", this); + return false; + } + if (!isInState(STATE.STARTED)) { + //reject attempts to start a service too early + throw new ServiceStateException( + "Cannot start a child service when not started"); + } + if (serviceList.isEmpty()) { + //nothing left to run + return false; + } + if (activeService != null && activeService.getFailureCause() != null) { + //did the last service fail? Is this caused by some premature callback? + LOG.debug("Not starting next service due to a failure of {}", + activeService); + return false; + } + //bear in mind that init & start can fail, which + //can trigger re-entrant calls into the state change listener. + //by setting the current service to null + //the start-next-service logic is skipped. + //now, what does that mean w.r.t exit states? + + activeService = null; + Service head = serviceList.remove(0); + + try { + head.init(getConfig()); + head.registerServiceListener(this); + head.start(); + } catch (RuntimeException e) { + noteFailure(e); + throw e; + } + //at this point the service must have explicitly started & not failed, + //else an exception would have been raised + activeService = head; + return true; + } + + /** + * State change event relays service stop events to + * {@link #onServiceCompleted(Service)}. Subclasses can + * extend that with extra logic + * @param service the service that has changed. + */ + @Override + public void stateChanged(Service service) { + // only react to the state change when it is the current service + // and it has entered the STOPPED state + if (service == activeService && service.isInState(STATE.STOPPED)) { + onServiceCompleted(service); + } + } + + /** + * handler for service completion: base class starts the next service + * @param service service that has completed + */ + protected synchronized void onServiceCompleted(Service service) { + LOG.info("Running service stopped: {}", service); + previousService = activeService; + + + //start the next service if we are not stopped ourselves + if (isInState(STATE.STARTED)) { + + //did the service fail? if so: propagate + Throwable failureCause = service.getFailureCause(); + if (failureCause != null) { + Exception e = (failureCause instanceof Exception) ? + (Exception) failureCause : new Exception(failureCause); + noteFailure(e); + stop(); + } + + //start the next service + boolean started; + try { + started = startNextService(); + } catch (Exception e) { + //something went wrong here + noteFailure(e); + started = false; + } + if (!started) { + //no start because list is empty + //stop and expect the notification to go upstream + stop(); + } + } else { + //not started, so just note that the current service + //has gone away + activeService = null; + } + } + + /** + * Add the passed {@link Service} to the list of services managed by this + * {@link WorkflowSequenceService} + * @param service the {@link Service} to be added + */ + @Override + public synchronized void addService(Service service) { + Preconditions.checkArgument(service != null, "null service argument"); + LOG.debug("Adding service {} ", service.getName()); + synchronized (serviceList) { + serviceList.add(service); + } + } + + /** + * Get an unmodifiable list of services + * @return a list of child services at the time of invocation - + * added services will not be picked up. + */ + @Override //Parent + public synchronized List<Service> getServices() { + return Collections.unmodifiableList(serviceList); + } + + @Override // Object + public synchronized String toString() { + return super.toString() + "; current service " + activeService + + "; queued service count=" + serviceList.size(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java new file mode 100644 index 0000000..36d059a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.workflow; + +/** + +<p> + This package contains classes which can be aggregated to build up + complex workflows of services: sequences of operations, callbacks + and composite services with a shared lifespan. + </p> + +<h2> + Core concepts: +</h2> + + +<p> +The Workflow Services are set of Hadoop YARN services, all implementing +the {@link org.apache.hadoop.service.Service} API. +They are designed to be aggregated, to be composed to produce larger +composite services which than perform ordered operations, notify other services +when work has completed, and to propagate failure up the service hierarchy. +</p> +<p> +Service instances may a limited lifespan, and may self-terminate when +they consider it appropriate.</p> +<p> +Workflow Services that have children implement the +{@link org.apache.slider.server.services.workflow.ServiceParent} +class, which provides (thread-safe) access to the children -allowing new children +to be added, and existing children to be ennumerated. The implement policies +on how to react to the termination of children -so can sequence operations +which terminate themselves when complete. +</p> + +<p> +Workflow Services may be subclassed to extend their behavior, or to use them +in specific applications. Just as the standard +{@link org.apache.hadoop.service.CompositeService} +is often subclassed to aggregate child services, the +{@link org.apache.slider.server.services.workflow.WorkflowCompositeService} +can be used instead -adding the feature that failing services trigger automatic +parent shutdown. If that is the desired operational mode of a class, +swapping the composite service implementation may be sufficient to adopt it. +</p> + + +<h2> How do the workflow services differ from the standard YARN services? </h2> + + <p> + + There is exactly one standard YARN service for managing children, the + {@link org.apache.hadoop.service.CompositeService}. + </p><p> + The {@link org.apache.slider.server.services.workflow.WorkflowCompositeService} + shares the same model of "child services, all inited and started together". + Where it differs is that if any child service stops -either due to a failure + or to an action which invokes that service's + {@link org.apache.hadoop.service.Service#stop()} method. + </p> + <p> + +In contrast, the original <code>CompositeService</code> class starts its children +in its{@link org.apache.hadoop.service.Service#start()} method, but does not +listen or react to any child service halting. As a result, changes in child +state are not automatically detected or propagated, other than failures in +the actual init() and start() methods. +</p> + +<p> +If a child service runs until completed -that is it will not be stopped until +instructed to do so, and if it is only the parent service that attempts to +stop the child, then this difference is unimportant. +</p> +<p> + +However, if any service that depends upon all it child services running - +and if those child services are written so as to stop when they fail, using +the <code>WorkflowCompositeService</code> as a base class will enable the +parent service to be automatically notified of a child stopping. + +</p> +<p> +The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService} +resembles the composite service in API, but its workflow is different. It +initializes and starts its children one-by-one, only starting the second after +the first one succeeds, the third after the second, etc. If any service in +the sequence fails, the parent <code>WorkflowSequenceService</code> stops, +reporting the same exception. +</p> + +<p> +The {@link org.apache.slider.server.services.workflow.ForkedProcessService}: +Executes a process when started, and binds to the life of that process. When the +process terminates, so does the service -and vice versa. This service enables +external processes to be executed as part of a sequence of operations -or, +using the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService} +in parallel with other services, terminating the process when the other services +stop -and vice versa. +</p> + +<p> +The {@link org.apache.slider.server.services.workflow.WorkflowCallbackService} +executes a {@link java.util.concurrent.Callable} callback a specified delay +after the service is started, then potentially terminates itself. +This is useful for callbacks when a workflow reaches a specific point +-or simply for executing arbitrary code in the workflow. + + </p> + + +<h2> +Other Workflow Services +</h2> + +There are some minor services that have proven useful within aggregate workflows, +and simply in applications which are built from composite YARN services. + + <ul> + <li>{@link org.apache.slider.server.services.workflow.WorkflowRpcService }: + Maintains a reference to an RPC {@link org.apache.hadoop.ipc.Server} instance. + When the service is started, so is the RPC server. Similarly, when the service + is stopped, so is the RPC server instance. + </li> + + <li>{@link org.apache.slider.server.services.workflow.ClosingService}: Closes + an instance of {@link java.io.Closeable} when the service is stopped. This + is purely a housekeeping class. + </li> + + </ul> + + Lower-level classes + <ul> + <li>{@link org.apache.slider.server.services.workflow.ServiceTerminatingRunnable }: + A {@link java.lang.Runnable} which runs the runnable supplied in its constructor + then signals its owning service to stop once that runnable is completed. + Any exception raised in the run is stored. + </li> + <li>{@link org.apache.slider.server.services.workflow.WorkflowExecutorService}: + A base class for services that wish to have a {@link java.util.concurrent.ExecutorService} + with a lifespan mapped to that of a service. When the service is stopped, the + {@link java.util.concurrent.ExecutorService#shutdownNow()} method is called to + attempt to shut down all running tasks. + </li> + <li>{@link org.apache.slider.server.services.workflow.ServiceThreadFactory}: + This is a simple {@link java.util.concurrent.ThreadFactory} which generates + meaningful thread names. It can be used as a parameter to constructors of + {@link java.util.concurrent.ExecutorService} instances, to ensure that + log information can tie back text to the related services</li> + </ul> + + + + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java new file mode 100644 index 0000000..254bf27 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.services.yarnregistry; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; + +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.slider.common.tools.SliderUtils; + +import java.io.IOException; +import java.util.List; + +import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.join; + +/** + * Registry view for providers. This tracks where the service + * is registered, offers access to the record and other things. + */ +public class YarnRegistryViewForProviders { + + private final RegistryOperations registryOperations; + + private final String user; + + private final String sliderServiceClass; + private final String instanceName; + private final ApplicationAttemptId applicationAttemptId; + /** + * Record used where the service registered itself. + * Null until the service is registered + */ + private ServiceRecord selfRegistration; + + /** + * Path where record was registered + * Null until the service is registered + */ + private String selfRegistrationPath; + + public YarnRegistryViewForProviders(RegistryOperations registryOperations, + String user, + String sliderServiceClass, + String instanceName, + ApplicationAttemptId applicationAttemptId) { + Preconditions.checkArgument(registryOperations != null, + "null registry operations"); + Preconditions.checkArgument(user != null, "null user"); + Preconditions.checkArgument(SliderUtils.isSet(sliderServiceClass), + "unset service class"); + Preconditions.checkArgument(SliderUtils.isSet(instanceName), + "instanceName"); + Preconditions.checkArgument(applicationAttemptId != null, + "null applicationAttemptId"); + this.registryOperations = registryOperations; + this.user = user; + this.sliderServiceClass = sliderServiceClass; + this.instanceName = instanceName; + this.applicationAttemptId = applicationAttemptId; + } + + public ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } + + public String getUser() { + return user; + } + + public String getSliderServiceClass() { + return sliderServiceClass; + } + + public String getInstanceName() { + return instanceName; + } + + public RegistryOperations getRegistryOperations() { + return registryOperations; + } + + public ServiceRecord getSelfRegistration() { + return selfRegistration; + } + + private void setSelfRegistration(ServiceRecord selfRegistration) { + this.selfRegistration = selfRegistration; + } + + /** + * Get the path to where the service has registered itself. + * Null until the service is registered + * @return the service registration path. + */ + public String getSelfRegistrationPath() { + return selfRegistrationPath; + } + + /** + * Get the absolute path to where the service has registered itself. + * This includes the base registry path + * Null until the service is registered + * @return the service registration path. + */ + public String getAbsoluteSelfRegistrationPath() { + if (selfRegistrationPath == null) { + return null; + } + String root = registryOperations.getConfig().getTrimmed( + RegistryConstants.KEY_REGISTRY_ZK_ROOT, + RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); + return RegistryPathUtils.join(root, selfRegistrationPath); + } + + /** + * Add a component under the slider name/entry + * @param componentName component name + * @param record record to put + * @throws IOException + */ + public void putComponent(String componentName, + ServiceRecord record) throws + IOException { + putComponent(sliderServiceClass, instanceName, + componentName, + record); + } + + /** + * Add a component + * @param serviceClass service class to use under ~user + * @param componentName component name + * @param record record to put + * @throws IOException + */ + public void putComponent(String serviceClass, + String serviceName, + String componentName, + ServiceRecord record) throws IOException { + String path = RegistryUtils.componentPath( + user, serviceClass, serviceName, componentName); + registryOperations.mknode(RegistryPathUtils.parentOf(path), true); + registryOperations.bind(path, record, BindFlags.OVERWRITE); + } + + /** + * Add a service under a path, optionally purging any history + * @param username user + * @param serviceClass service class to use under ~user + * @param serviceName name of the service + * @param record service record + * @param deleteTreeFirst perform recursive delete of the path first. + * @return the path the service was created at + * @throws IOException + */ + public String putService(String username, + String serviceClass, + String serviceName, + ServiceRecord record, + boolean deleteTreeFirst) throws IOException { + String path = RegistryUtils.servicePath( + username, serviceClass, serviceName); + if (deleteTreeFirst) { + registryOperations.delete(path, true); + } + registryOperations.mknode(RegistryPathUtils.parentOf(path), true); + registryOperations.bind(path, record, BindFlags.OVERWRITE); + return path; + } + + /** + * Add a service under a path for the current user + * @param serviceClass service class to use under ~user + * @param serviceName name of the service + * @param record service record + * @param deleteTreeFirst perform recursive delete of the path first + * @return the path the service was created at + * @throws IOException + */ + public String putService( + String serviceClass, + String serviceName, + ServiceRecord record, + boolean deleteTreeFirst) throws IOException { + return putService(user, serviceClass, serviceName, record, deleteTreeFirst); + } + + + /** + * Add a service under a path for the current user + * @param serviceClass service class to use under ~user + * @param serviceName name of the service + * @param record service record + * @param deleteTreeFirst perform recursive delete of the path first + * @return the path the service was created at + * @throws IOException + */ + public String registerSelf( + ServiceRecord record, + boolean deleteTreeFirst) throws IOException { + selfRegistrationPath = + putService(user, sliderServiceClass, instanceName, record, deleteTreeFirst); + setSelfRegistration(record); + return selfRegistrationPath; + } + + /** + * Update the self record by pushing out the latest version of the service + * registration record. + * @throws IOException any failure. + */ + public void updateSelf() throws IOException { + putService(user, sliderServiceClass, instanceName, selfRegistration, false); + } + + /** + * Delete a component + * @param componentName component name + * @throws IOException + */ + public void deleteComponent(String componentName) throws IOException { + String path = RegistryUtils.componentPath( + user, sliderServiceClass, instanceName, + componentName); + registryOperations.delete(path, false); + } + + /** + * Delete the children of a path -but not the path itself. + * It is not an error if the path does not exist + * @param path path to delete + * @param recursive flag to request recursive deletes + * @throws IOException IO problems + */ + public void deleteChildren(String path, boolean recursive) throws IOException { + List<String> childNames = null; + try { + childNames = registryOperations.list(path); + } catch (PathNotFoundException e) { + return; + } + for (String childName : childNames) { + String child = join(path, childName); + registryOperations.delete(child, recursive); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org