Repository: ignite Updated Branches: refs/heads/master 02bca3f63 -> c9368da76
IGNITE-9193: Stop child python processes on parent stop. this closes #4491 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9368da7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9368da7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9368da7 Branch: refs/heads/master Commit: c9368da76f6a2c7555d355219a000ca371552b7d Parents: 02bca3f Author: Anton Dmitriev <[email protected]> Authored: Wed Aug 8 13:13:46 2018 +0300 Committer: Yury Babak <[email protected]> Committed: Wed Aug 8 13:13:46 2018 +0300 ---------------------------------------------------------------------- .../tfrunning/TensorFlowServerManager.java | 4 +- .../TensorFlowServerScriptFormatter.java | 30 +++++++++++++++ .../cluster/util/TensorFlowChiefRunner.java | 6 ++- .../util/TensorFlowUserScriptRunner.java | 3 +- .../core/pythonrunning/PythonProcess.java | 12 +++++- .../PythonProcessBuilderSupplier.java | 40 +++++++++++++++++++- .../pythonrunning/PythonProcessManager.java | 2 +- 7 files changed, 90 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java index ee87089..fa6194a 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java @@ -58,7 +58,9 @@ public class TensorFlowServerManager extends ProcessManagerWrapper<PythonProcess @Override protected PythonProcess transformSpecification(TensorFlowServer spec) { return new PythonProcess( scriptFormatter.format(spec, true, Ignition.ignite()), - getNode(spec) + getNode(spec), + "job:" + spec.getJobName(), + "task:" + spec.getTaskIdx() ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java index 0645964..a93d910 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java @@ -34,12 +34,42 @@ public class TensorFlowServerScriptFormatter { public String format(TensorFlowServer srv, boolean join, Ignite ignite) { StringBuilder builder = new StringBuilder(); + builder.append("from threading import Thread").append("\n"); + builder.append("from time import sleep").append("\n"); + builder.append("import os, signal").append("\n"); + builder.append("\n"); + builder.append("def check_pid(pid):").append("\n"); + builder.append(" try:").append("\n"); + builder.append(" os.kill(pid, 0)").append("\n"); + builder.append(" except OSError:").append("\n"); + builder.append(" return False").append("\n"); + builder.append(" else:").append("\n"); + builder.append(" return True").append("\n"); + builder.append("\n"); + builder.append("def threaded_function(pid):").append("\n"); + builder.append(" while check_pid(pid):").append("\n"); + builder.append(" sleep(1)").append("\n"); + builder.append(" os.kill(os.getpid(), signal.SIGUSR1)").append("\n"); + builder.append("\n"); + builder.append("Thread(target = threaded_function, args = (int(os.environ['PPID']), )).start()") + .append("\n"); + builder.append("\n"); + builder.append("import tensorflow as tf").append('\n'); builder.append("from tensorflow.contrib.ignite import IgniteDataset").append("\n"); + builder.append("\n"); builder.append("cluster = tf.train.ClusterSpec(") .append(srv.getClusterSpec().format(ignite)) .append(')') .append('\n'); + builder.append(""); + + builder.append("print('job:%s task:%d' % ('") + .append(srv.getJobName()) + .append("', ") + .append(srv.getTaskIdx()) + .append("))") + .append("\n"); builder.append("server = tf.train.Server(cluster"); http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java index 6998681..96535eb 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java @@ -66,7 +66,11 @@ public class TensorFlowChiefRunner extends AsyncNativeProcessRunner { TensorFlowServer srv = new TensorFlowServer(spec, TensorFlowClusterResolver.CHIEF_JOB_NAME, 0); return new NativeProcessRunner( - new PythonProcessBuilderSupplier(true).get(), + new PythonProcessBuilderSupplier( + true, + "job:" + srv.getJobName(), + "task:" + srv.getTaskIdx() + ).get(), new TensorFlowServerScriptFormatter().format(srv, true, ignite), out, err http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java index 3dcd5f8..6bb3b0a 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java @@ -37,6 +37,7 @@ import org.apache.ignite.Ignition; import org.apache.ignite.tensorflow.cluster.TensorFlowJobArchive; import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec; import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec; +import org.apache.ignite.tensorflow.core.pythonrunning.PythonProcessBuilderSupplier; import org.apache.ignite.tensorflow.core.util.AsyncNativeProcessRunner; import org.apache.ignite.tensorflow.core.util.NativeProcessRunner; @@ -117,7 +118,7 @@ public class TensorFlowUserScriptRunner extends AsyncNativeProcessRunner { if (workingDir == null) throw new IllegalStateException("Working directory is not created"); - ProcessBuilder procBuilder = new ProcessBuilder(); + ProcessBuilder procBuilder = new PythonProcessBuilderSupplier(false).get(); procBuilder.directory(workingDir); procBuilder.command(jobArchive.getCommands()); http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java index 9ed8b20..34c5a12 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java @@ -33,17 +33,22 @@ public class PythonProcess implements Serializable { /** Node identifier. */ private final UUID nodeId; + /** Meta information that adds to script as arguments. */ + private final String[] meta; + /** * Constructs a new instance of python process. * * @param stdin Stdin of the process. * @param nodeId Node identifier. + * @param meta Meta information that adds to script as arguments. */ - public PythonProcess(String stdin, UUID nodeId) { + public PythonProcess(String stdin, UUID nodeId, String... meta) { assert nodeId != null : "Node identifier should not be null"; this.stdin = stdin; this.nodeId = nodeId; + this.meta = meta; } /** */ @@ -55,4 +60,9 @@ public class PythonProcess implements Serializable { public UUID getNodeId() { return nodeId; } + + /** */ + public String[] getMeta() { + return meta; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java index e59ab00..c7f7fde 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java @@ -17,6 +17,8 @@ package org.apache.ignite.tensorflow.core.pythonrunning; +import java.lang.management.ManagementFactory; +import java.util.Map; import org.apache.ignite.tensorflow.util.SerializableSupplier; /** @@ -32,13 +34,18 @@ public class PythonProcessBuilderSupplier implements SerializableSupplier<Proces /** Interactive flag (allows to used standard input to pass Python script). */ private final boolean interactive; + /** Meta information that adds to script as arguments. */ + private final String[] meta; + /** * Constructs a new instance of Python process builder supplier. * * @param interactive Interactive flag (allows to used standard input to pass Python script). + * @param meta Meta information that adds to script as arguments. */ - public PythonProcessBuilderSupplier(boolean interactive) { + public PythonProcessBuilderSupplier(boolean interactive, String... meta) { this.interactive = interactive; + this.meta = meta; } /** @@ -52,6 +59,35 @@ public class PythonProcessBuilderSupplier implements SerializableSupplier<Proces if (python == null) python = "python3"; - return interactive ? new ProcessBuilder(python, "-i") : new ProcessBuilder(python); + ProcessBuilder procBldr; + if (interactive) { + String[] cmd = new String[meta.length + 3]; + + cmd[0] = python; + cmd[1] = "-i"; + cmd[2] = "-"; + + System.arraycopy(meta, 0, cmd, 3, meta.length); + + procBldr = new ProcessBuilder(cmd); + } + else + procBldr = new ProcessBuilder(python); + + Map<String, String> env = procBldr.environment(); + env.put("PPID", String.valueOf(getProcessId())); + + return procBldr; + } + + /** + * Returns current process identifier. + * + * @return Process identifier. + */ + private long getProcessId() { + String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; + + return Long.parseLong(pid); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java index 1f6c11e..d050c0e 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java @@ -48,7 +48,7 @@ public class PythonProcessManager extends ProcessManagerWrapper<NativeProcess, P /** {@inheritDoc} */ @Override protected NativeProcess transformSpecification(PythonProcess spec) { return new NativeProcess( - new PythonProcessBuilderSupplier(true), + new PythonProcessBuilderSupplier(true, spec.getMeta()), spec.getStdin(), spec.getNodeId() );
