Repository: ignite
Updated Branches:
  refs/heads/master 02bca3f63 -> c9368da76


IGNITE-9193: Stop child python processes on parent stop.

this closes #4491


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9368da7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9368da7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9368da7

Branch: refs/heads/master
Commit: c9368da76f6a2c7555d355219a000ca371552b7d
Parents: 02bca3f
Author: Anton Dmitriev <[email protected]>
Authored: Wed Aug 8 13:13:46 2018 +0300
Committer: Yury Babak <[email protected]>
Committed: Wed Aug 8 13:13:46 2018 +0300

----------------------------------------------------------------------
 .../tfrunning/TensorFlowServerManager.java      |  4 +-
 .../TensorFlowServerScriptFormatter.java        | 30 +++++++++++++++
 .../cluster/util/TensorFlowChiefRunner.java     |  6 ++-
 .../util/TensorFlowUserScriptRunner.java        |  3 +-
 .../core/pythonrunning/PythonProcess.java       | 12 +++++-
 .../PythonProcessBuilderSupplier.java           | 40 +++++++++++++++++++-
 .../pythonrunning/PythonProcessManager.java     |  2 +-
 7 files changed, 90 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
index ee87089..fa6194a 100644
--- 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
@@ -58,7 +58,9 @@ public class TensorFlowServerManager extends 
ProcessManagerWrapper<PythonProcess
     @Override protected PythonProcess transformSpecification(TensorFlowServer 
spec) {
         return new PythonProcess(
             scriptFormatter.format(spec, true, Ignition.ignite()),
-            getNode(spec)
+            getNode(spec),
+            "job:" + spec.getJobName(),
+            "task:" + spec.getTaskIdx()
         );
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java
index 0645964..a93d910 100644
--- 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerScriptFormatter.java
@@ -34,12 +34,42 @@ public class TensorFlowServerScriptFormatter {
     public String format(TensorFlowServer srv, boolean join, Ignite ignite) {
         StringBuilder builder = new StringBuilder();
 
+        builder.append("from threading import Thread").append("\n");
+        builder.append("from time import sleep").append("\n");
+        builder.append("import os, signal").append("\n");
+        builder.append("\n");
+        builder.append("def check_pid(pid):").append("\n");
+        builder.append("    try:").append("\n");
+        builder.append("        os.kill(pid, 0)").append("\n");
+        builder.append("    except OSError:").append("\n");
+        builder.append("        return False").append("\n");
+        builder.append("    else:").append("\n");
+        builder.append("        return True").append("\n");
+        builder.append("\n");
+        builder.append("def threaded_function(pid):").append("\n");
+        builder.append("    while check_pid(pid):").append("\n");
+        builder.append("        sleep(1)").append("\n");
+        builder.append("    os.kill(os.getpid(), 
signal.SIGUSR1)").append("\n");
+        builder.append("\n");
+        builder.append("Thread(target = threaded_function, args = 
(int(os.environ['PPID']), )).start()")
+            .append("\n");
+        builder.append("\n");
+
         builder.append("import tensorflow as tf").append('\n');
         builder.append("from tensorflow.contrib.ignite import 
IgniteDataset").append("\n");
+        builder.append("\n");
         builder.append("cluster = tf.train.ClusterSpec(")
             .append(srv.getClusterSpec().format(ignite))
             .append(')')
             .append('\n');
+        builder.append("");
+
+        builder.append("print('job:%s task:%d' % ('")
+            .append(srv.getJobName())
+            .append("', ")
+            .append(srv.getTaskIdx())
+            .append("))")
+            .append("\n");
 
         builder.append("server = tf.train.Server(cluster");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java
index 6998681..96535eb 100644
--- 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowChiefRunner.java
@@ -66,7 +66,11 @@ public class TensorFlowChiefRunner extends 
AsyncNativeProcessRunner {
         TensorFlowServer srv = new TensorFlowServer(spec, 
TensorFlowClusterResolver.CHIEF_JOB_NAME, 0);
 
         return new NativeProcessRunner(
-            new PythonProcessBuilderSupplier(true).get(),
+            new PythonProcessBuilderSupplier(
+                true,
+                "job:" + srv.getJobName(),
+                "task:" + srv.getTaskIdx()
+            ).get(),
             new TensorFlowServerScriptFormatter().format(srv, true, ignite),
             out,
             err

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
index 3dcd5f8..6bb3b0a 100644
--- 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
@@ -37,6 +37,7 @@ import org.apache.ignite.Ignition;
 import org.apache.ignite.tensorflow.cluster.TensorFlowJobArchive;
 import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec;
 import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec;
+import 
org.apache.ignite.tensorflow.core.pythonrunning.PythonProcessBuilderSupplier;
 import org.apache.ignite.tensorflow.core.util.AsyncNativeProcessRunner;
 import org.apache.ignite.tensorflow.core.util.NativeProcessRunner;
 
@@ -117,7 +118,7 @@ public class TensorFlowUserScriptRunner extends 
AsyncNativeProcessRunner {
         if (workingDir == null)
             throw new IllegalStateException("Working directory is not 
created");
 
-        ProcessBuilder procBuilder = new ProcessBuilder();
+        ProcessBuilder procBuilder = new 
PythonProcessBuilderSupplier(false).get();
 
         procBuilder.directory(workingDir);
         procBuilder.command(jobArchive.getCommands());

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java
index 9ed8b20..34c5a12 100644
--- 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java
@@ -33,17 +33,22 @@ public class PythonProcess implements Serializable {
     /** Node identifier. */
     private final UUID nodeId;
 
+    /** Meta information that adds to script as arguments.  */
+    private final String[] meta;
+
     /**
      * Constructs a new instance of python process.
      *
      * @param stdin  Stdin of the process.
      * @param nodeId Node identifier.
+     * @param meta Meta information that adds to script as arguments.
      */
-    public PythonProcess(String stdin, UUID nodeId) {
+    public PythonProcess(String stdin, UUID nodeId, String... meta) {
         assert nodeId != null : "Node identifier should not be null";
 
         this.stdin = stdin;
         this.nodeId = nodeId;
+        this.meta = meta;
     }
 
     /** */
@@ -55,4 +60,9 @@ public class PythonProcess implements Serializable {
     public UUID getNodeId() {
         return nodeId;
     }
+
+    /** */
+    public String[] getMeta() {
+        return meta;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
index e59ab00..c7f7fde 100644
--- 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.tensorflow.core.pythonrunning;
 
+import java.lang.management.ManagementFactory;
+import java.util.Map;
 import org.apache.ignite.tensorflow.util.SerializableSupplier;
 
 /**
@@ -32,13 +34,18 @@ public class PythonProcessBuilderSupplier implements 
SerializableSupplier<Proces
     /** Interactive flag (allows to used standard input to pass Python 
script). */
     private final boolean interactive;
 
+    /** Meta information that adds to script as arguments. */
+    private final String[] meta;
+
     /**
      * Constructs a new instance of Python process builder supplier.
      *
      * @param interactive Interactive flag (allows to used standard input to 
pass Python script).
+     * @param meta Meta information that adds to script as arguments.
      */
-    public PythonProcessBuilderSupplier(boolean interactive) {
+    public PythonProcessBuilderSupplier(boolean interactive, String... meta) {
         this.interactive = interactive;
+        this.meta = meta;
     }
 
     /**
@@ -52,6 +59,35 @@ public class PythonProcessBuilderSupplier implements 
SerializableSupplier<Proces
         if (python == null)
             python = "python3";
 
-        return interactive ? new ProcessBuilder(python, "-i") : new 
ProcessBuilder(python);
+        ProcessBuilder procBldr;
+        if (interactive) {
+            String[] cmd = new String[meta.length + 3];
+
+            cmd[0] = python;
+            cmd[1] = "-i";
+            cmd[2] = "-";
+
+            System.arraycopy(meta, 0, cmd, 3, meta.length);
+
+            procBldr = new ProcessBuilder(cmd);
+        }
+        else
+            procBldr = new ProcessBuilder(python);
+
+        Map<String, String> env = procBldr.environment();
+        env.put("PPID", String.valueOf(getProcessId()));
+
+        return procBldr;
+    }
+
+    /**
+     * Returns current process identifier.
+     *
+     * @return Process identifier.
+     */
+    private long getProcessId() {
+        String pid = 
ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
+
+        return Long.parseLong(pid);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9368da7/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
index 1f6c11e..d050c0e 100644
--- 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
@@ -48,7 +48,7 @@ public class PythonProcessManager extends 
ProcessManagerWrapper<NativeProcess, P
     /** {@inheritDoc} */
     @Override protected NativeProcess transformSpecification(PythonProcess 
spec) {
         return new NativeProcess(
-            new PythonProcessBuilderSupplier(true),
+            new PythonProcessBuilderSupplier(true, spec.getMeta()),
             spec.getStdin(),
             spec.getNodeId()
         );

Reply via email to