Repository: tajo
Updated Branches:
  refs/heads/master f2c434332 -> ace0480fe


TAJO-1802: PythonScriptEngine copies controller and tajo util whenever it is 
initialized.

Closes #715


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ace0480f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ace0480f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ace0480f

Branch: refs/heads/master
Commit: ace0480feacab917e6c73dd37314994050ab6207
Parents: f2c4343
Author: Jihoon Son <[email protected]>
Authored: Wed Sep 2 18:26:47 2015 +0900
Committer: Jihoon Son <[email protected]>
Committed: Wed Sep 2 18:26:47 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/master/TajoMaster.java |   3 +
 .../apache/tajo/master/exec/QueryExecutor.java  |   4 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |   3 +
 .../src/main/resources/python/controller.py     |  26 +---
 .../plan/exprrewrite/rules/ConstantFolding.java |   4 +-
 .../function/python/PythonScriptEngine.java     | 138 ++++++++++---------
 7 files changed, 87 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 23291e5..e00f295 100644
--- a/CHANGES
+++ b/CHANGES
@@ -234,6 +234,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1802: PythonScriptEngine copies controller and tajo util whenever 
+    it is initialized. (jihoon)
+
     TAJO-1707: Rack local count can be more than actual number of tasks.
     (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index dfe8909..754df7f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -48,6 +48,7 @@ import org.apache.tajo.function.FunctionSignature;
 import org.apache.tajo.master.rm.TajoResourceManager;
 import org.apache.tajo.metrics.ClusterResourceMetricSet;
 import org.apache.tajo.metrics.Master;
+import org.apache.tajo.plan.function.python.PythonScriptEngine;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.RpcConstants;
@@ -203,6 +204,8 @@ public class TajoMaster extends CompositeService {
 
     restServer = new TajoRestService(context);
     addIfService(restServer);
+
+    PythonScriptEngine.initPythonScriptEngineFiles();
     
     // Try to start up all services in TajoMaster.
     // If anyone is failed, the master prints out the errors and immediately 
should shutdowns

http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index f08a9f6..83b7df4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -54,7 +54,9 @@ import 
org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
 import org.apache.tajo.master.exec.prehook.InsertIntoHook;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.Target;
-import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.expr.EvalContext;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.expr.GeneralFunctionEval;
 import org.apache.tajo.plan.function.python.PythonScriptEngine;
 import org.apache.tajo.plan.function.python.TajoScriptEngine;
 import org.apache.tajo.plan.logical.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 811ba64..65a9511 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -39,6 +39,7 @@ import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.function.FunctionSignature;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.metrics.Node;
+import org.apache.tajo.plan.function.python.PythonScriptEngine;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.querymaster.QueryMaster;
 import org.apache.tajo.querymaster.QueryMasterManagerService;
@@ -237,6 +238,8 @@ public class TajoWorker extends CompositeService {
     historyReader = new HistoryReader(workerContext.getWorkerName(), 
this.systemConf);
 
     FunctionLoader.loadUserDefinedFunctions(systemConf, new 
HashMap<FunctionSignature, FunctionDesc>());
+
+    PythonScriptEngine.initPythonScriptEngineFiles();
     
     diagnoseTajoWorker();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-core/src/main/resources/python/controller.py
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/python/controller.py 
b/tajo-core/src/main/resources/python/controller.py
index 8d0f995..e1dc2d3 100644
--- a/tajo-core/src/main/resources/python/controller.py
+++ b/tajo-core/src/main/resources/python/controller.py
@@ -17,7 +17,6 @@
 
 import sys
 import os
-import logging
 import base64
 import json
 
@@ -28,7 +27,7 @@ try:
 except ImportError:
     USE_DATEUTIL = False
 
-from tajo_util import write_user_exception, udf_logging
+from tajo_util import write_user_exception
 
 FIELD_DELIMITER = ','
 TUPLE_START = '('
@@ -95,8 +94,6 @@ class PythonStreamingController:
     scalar_func = None
     udaf_instance = None
 
-    should_log = False
-    log_message = logging.info
     module_name = None
     output_schema = None
 
@@ -104,8 +101,7 @@ class PythonStreamingController:
         self.profiling_mode = profiling_mode
 
     def main(self,
-             module_name, file_path, cache_path,
-             output_stream_path, error_stream_path, log_file_name, 
output_schema, name, func_type):
+             module_name, file_path, cache_path, output_schema, name, 
func_type):
         sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0)
 
         # Need to ensure that user functions can't write to the streams we use 
to communicate with pig.
@@ -118,18 +114,10 @@ class PythonStreamingController:
         sys.path.append(cache_path)
         sys.path.append('.')
 
-        if self.should_log:
-            logging.basicConfig(filename=log_file_name, format="%(asctime)s 
%(levelname)s %(message)s", level=udf_logging.udf_log_level)
-            logging.info("To reduce the amount of information being logged 
only a small subset of rows are logged at the "
-                         "INFO level.  Call udf_logging.set_log_level_debug in 
tajo_util to see all rows being processed.")
-
         self.module_name = module_name
         self.output_schema = output_schema
         input_str = self.get_next_input()
 
-        if udf_logging.udf_log_level == logging.DEBUG:
-            self.log_message = logging.debug
-
         while input_str != END_OF_STREAM:
 
             if func_type == 'UDAF':
@@ -171,11 +159,7 @@ class PythonStreamingController:
     def process_input(self, func_name, func, input_str):
         try:
             try:
-                if self.should_log:
-                    self.log_message("Serialized Input: %s" % (input_str))
                 inputs = deserialize_input(input_str)
-                if self.should_log:
-                    self.log_message("Deserialized Input: %s" % 
(unicode(inputs)))
             except:
                 # Capture errors where the user passes in bad data.
                 write_user_exception(self.module_name, self.stream_error, 
NUM_LINES_OFFSET_TRACE)
@@ -192,8 +176,6 @@ class PythonStreamingController:
                     func_output = func(*inputs)
                     output = serialize_output(func_output, self.output_schema)
 
-                if self.should_log:
-                    self.log_message("Serialized Output: %s" % output)
             except:
                 # These errors should always be caused by user code.
                 write_user_exception(self.module_name, self.stream_error, 
NUM_LINES_OFFSET_TRACE)
@@ -214,7 +196,6 @@ class PythonStreamingController:
 
     def get_next_input(self):
         input_stream = self.input_stream
-        # log_stream = self.log_stream
 
         input_str = input_stream.readline()
 
@@ -466,5 +447,4 @@ def list_to_str(list_of_item, out_schema):
 
 if __name__ == '__main__':
     controller = PythonStreamingController()
-    controller.main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4],
-                    sys.argv[5], sys.argv[6], sys.argv[7], sys.argv[8], 
sys.argv[9])
+    controller.main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], 
sys.argv[5], sys.argv[6])

http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
index eb546d1..95f4c76 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java
@@ -19,10 +19,10 @@
 package org.apache.tajo.plan.exprrewrite.rules;
 
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.annotator.Prioritized;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.exprrewrite.EvalTreeOptimizationRule;
-import org.apache.tajo.plan.annotator.Prioritized;
-import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.function.python.PythonScriptEngine;
 import org.apache.tajo.plan.function.python.TajoScriptEngine;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
index 2d93aa9..e202d64 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
@@ -253,25 +253,23 @@ public class PythonScriptEngine extends TajoScriptEngine {
 
 
   private static final String PYTHON_LANGUAGE = "python";
-  private static final String PYTHON_ROOT_PATH = "/python";
   private static final String TAJO_UTIL_NAME = "tajo_util.py";
   private static final String CONTROLLER_NAME = "controller.py";
-  private static final String PYTHON_CONTROLLER_JAR_PATH = PYTHON_ROOT_PATH + 
File.separator + CONTROLLER_NAME; // Relative to root of tajo jar.
-  private static final String PYTHON_TAJO_UTIL_PATH = PYTHON_ROOT_PATH + 
File.separator + TAJO_UTIL_NAME; // Relative to root of tajo jar.
-  private static final String DEFAULT_LOG_DIR = "/tmp/tajo-" + 
System.getProperty("user.name") + "/python";
+  private static final String BASE_DIR = FileUtils.getTempDirectoryPath() + 
"/tajo-" + System.getProperty("user.name") + "/python";
+  private static final String PYTHON_CONTROLLER_JAR_PATH = "/python" + 
File.separator + CONTROLLER_NAME; // Relative to root of tajo jar.
+  private static final String PYTHON_TAJO_UTIL_JAR_PATH = "/python" + 
File.separator + TAJO_UTIL_NAME; // Relative to root of tajo jar.
 
   // Indexes for arguments being passed to external process
-  private static final int UDF_LANGUAGE = 0;
-  private static final int PATH_TO_CONTROLLER_FILE = 1;
-  private static final int UDF_FILE_NAME = 2; // Name of file where UDF 
function is defined
-  private static final int UDF_FILE_PATH = 3; // Path to directory containing 
file where UDF function is defined
-  private static final int PATH_TO_FILE_CACHE = 4; // Directory where required 
files (like tajo_util) are cached on cluster nodes.
-  private static final int STD_OUT_OUTPUT_PATH = 5; // File for output from 
when user writes to standard output.
-  private static final int STD_ERR_OUTPUT_PATH = 6; // File for output from 
when user writes to standard error.
-  private static final int CONTROLLER_LOG_FILE_PATH = 7; // Controller log 
file logs progress through the controller script not user code.
-  private static final int OUT_SCHEMA = 8; // the schema of the output column
-  private static final int FUNCTION_OR_CLASS_NAME = 9; // if FUNCTION_TYPE is 
UDF, function name; if FUNCTION_TYPE is UDAF, class name.
-  private static final int FUNCTION_TYPE = 10; // UDF or UDAF
+  enum COMMAND_IDX {
+    UDF_LANGUAGE,
+    PATH_TO_CONTROLLER_FILE,
+    UDF_FILE_NAME,
+    UDF_FILE_PATH,
+    PATH_TO_FILE_CACHE,
+    OUT_SCHEMA,
+    FUNCTION_OR_CLASS_NAME,
+    FUNCTION_TYPE,
+  }
 
   private Configuration systemConf;
 
@@ -365,41 +363,26 @@ public class PythonScriptEngine extends TajoScriptEngine {
    * @throws IOException
    */
   private String[] buildCommand() throws IOException {
-    String[] command = new String[11];
-
-    // TODO: support controller logging
-    String standardOutputRootWriteLocation = 
systemConf.get(TajoConf.ConfVars.PYTHON_CONTROLLER_LOG_DIR.keyname(),
-        DEFAULT_LOG_DIR);
-    if (!standardOutputRootWriteLocation.equals(DEFAULT_LOG_DIR)) {
-      LOG.warn("Currently, logging is not supported for the python 
controller.");
-    }
-    String controllerLogFileName, outFileName, errOutFileName;
+    String[] command = new String[8];
 
     String funcName = invocationDesc.getName();
     String filePath = invocationDesc.getPath();
 
-    controllerLogFileName = standardOutputRootWriteLocation + funcName + 
"_controller.log";
-    outFileName = standardOutputRootWriteLocation + funcName + ".out";
-    errOutFileName = standardOutputRootWriteLocation + funcName + ".err";
-
-    command[UDF_LANGUAGE] = PYTHON_LANGUAGE;
-    command[PATH_TO_CONTROLLER_FILE] = getControllerPath();
+    command[COMMAND_IDX.UDF_LANGUAGE.ordinal()] = PYTHON_LANGUAGE;
+    command[COMMAND_IDX.PATH_TO_CONTROLLER_FILE.ordinal()] = 
getControllerPath();
     int lastSeparator = filePath.lastIndexOf(File.separator) + 1;
     String fileName = filePath.substring(lastSeparator);
     fileName = fileName.endsWith(FILE_EXTENSION) ? fileName.substring(0, 
fileName.length()-3) : fileName;
-    command[UDF_FILE_NAME] = fileName;
-    command[UDF_FILE_PATH] = lastSeparator <= 0 ? "." : filePath.substring(0, 
lastSeparator - 1);
+    command[COMMAND_IDX.UDF_FILE_NAME.ordinal()] = fileName;
+    command[COMMAND_IDX.UDF_FILE_PATH.ordinal()] = lastSeparator <= 0 ? "." : 
filePath.substring(0, lastSeparator - 1);
     String fileCachePath = 
systemConf.get(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname());
     if (fileCachePath == null) {
       throw new IOException(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname() + " 
must be set.");
     }
-    command[PATH_TO_FILE_CACHE] = "'" + fileCachePath + "'";
-    command[STD_OUT_OUTPUT_PATH] = outFileName;
-    command[STD_ERR_OUTPUT_PATH] = errOutFileName;
-    command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName;
-    command[OUT_SCHEMA] = 
outSchema.getColumn(0).getDataType().getType().name().toLowerCase();
-    command[FUNCTION_OR_CLASS_NAME] = funcName;
-    command[FUNCTION_TYPE] = invocationDesc.isScalarFunction() ? "UDF" : 
"UDAF";
+    command[COMMAND_IDX.PATH_TO_FILE_CACHE.ordinal()] = "'" + fileCachePath + 
"'";
+    command[COMMAND_IDX.OUT_SCHEMA.ordinal()] = 
outSchema.getColumn(0).getDataType().getType().name().toLowerCase();
+    command[COMMAND_IDX.FUNCTION_OR_CLASS_NAME.ordinal()] = funcName;
+    command[COMMAND_IDX.FUNCTION_TYPE.ordinal()] = 
invocationDesc.isScalarFunction() ? "UDF" : "UDAF";
 
     return command;
   }
@@ -459,39 +442,48 @@ public class PythonScriptEngine extends TajoScriptEngine {
     stderr = new DataInputStream(new 
BufferedInputStream(process.getErrorStream()));
   }
 
+  private static final File pythonScriptBaseDir = new 
File(PythonScriptEngine.getBaseDirPath());
+  private static final File pythonScriptControllerCopy = new 
File(PythonScriptEngine.getControllerPath());
+  private static final File pythonScriptUtilCopy = new 
File(PythonScriptEngine.getTajoUtilPath());
+
+  public static void initPythonScriptEngineFiles() throws IOException {
+    if (!pythonScriptBaseDir.exists()) {
+      pythonScriptBaseDir.mkdirs();
+    }
+    // Controller and util should be always overwritten.
+    PythonScriptEngine.loadController(pythonScriptControllerCopy);
+    PythonScriptEngine.loadTajoUtil(pythonScriptUtilCopy);
+  }
+
+  public static void loadController(File controllerCopy) throws IOException {
+    try (InputStream controllerInputStream = 
PythonScriptEngine.class.getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH)) {
+      FileUtils.copyInputStreamToFile(controllerInputStream, controllerCopy);
+    }
+  }
+
+  public static void loadTajoUtil(File utilCopy) throws IOException {
+    try (InputStream utilInputStream = 
PythonScriptEngine.class.getResourceAsStream(PYTHON_TAJO_UTIL_JAR_PATH)) {
+      FileUtils.copyInputStreamToFile(utilInputStream, utilCopy);
+    }
+  }
+
+  public static String getBaseDirPath() {
+    LOG.info("Python base dir is " + BASE_DIR);
+    return BASE_DIR;
+  }
+
   /**
    * Find the path to the controller file for the streaming language.
    *
-   * First check path to job jar and if the file is not found (like in the
-   * case of running hadoop in standalone mode) write the necessary files
-   * to temporary files and return that path.
-   *
    * @return
    * @throws IOException
    */
-  private String getControllerPath() throws IOException {
-    String controllerPath = PYTHON_CONTROLLER_JAR_PATH;
-    File controller = new File(PYTHON_CONTROLLER_JAR_PATH);
-    if (!controller.exists()) {
-      File controllerFile = File.createTempFile("controller", FILE_EXTENSION);
-      InputStream pythonControllerStream = 
this.getClass().getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH);
-      try {
-        FileUtils.copyInputStreamToFile(pythonControllerStream, 
controllerFile);
-      } finally {
-        pythonControllerStream.close();
-      }
-      controllerFile.deleteOnExit();
-      File tajoUtilFile = new File(controllerFile.getParent() + File.separator 
+ TAJO_UTIL_NAME);
-      tajoUtilFile.deleteOnExit();
-      InputStream pythonUtilStream = 
this.getClass().getResourceAsStream(PYTHON_TAJO_UTIL_PATH);
-      try {
-        FileUtils.copyInputStreamToFile(pythonUtilStream, tajoUtilFile);
-      } finally {
-        pythonUtilStream.close();
-      }
-      controllerPath = controllerFile.getAbsolutePath();
-    }
-    return controllerPath;
+  public static String getControllerPath() {
+    return BASE_DIR + File.separator + CONTROLLER_NAME;
+  }
+
+  public static String getTajoUtilPath() {
+    return BASE_DIR + File.separator + TAJO_UTIL_NAME;
   }
 
   /**
@@ -511,7 +503,12 @@ public class PythonScriptEngine extends TajoScriptEngine {
 
     Datum result = null;
     try {
-      result = outputHandler.getNext().asDatum(0);
+      Tuple next = outputHandler.getNext();
+      if (next != null) {
+        result = next.asDatum(0);
+      } else {
+        throw new RuntimeException("Cannot get output result from python 
controller");
+      }
     } catch (Throwable e) {
       throwException(stderr, new RuntimeException("Problem getting output: " + 
e.getMessage(), e));
     }
@@ -652,7 +649,12 @@ public class PythonScriptEngine extends TajoScriptEngine {
     }
     Datum result = null;
     try {
-      result = outputHandler.getNext().asDatum(0);
+      Tuple next = outputHandler.getNext();
+      if (next != null) {
+        result = next.asDatum(0);
+      } else {
+        throw new RuntimeException("Cannot get output result from python 
controller");
+      }
     } catch (Exception e) {
       throwException(stderr, new RuntimeException("Problem getting output: " + 
e.getMessage(), e));
     }

Reply via email to