Repository: tajo Updated Branches: refs/heads/master f2c434332 -> ace0480fe
TAJO-1802: PythonScriptEngine copies controller and tajo util whenever it is initialized. Closes #715 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ace0480f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ace0480f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ace0480f Branch: refs/heads/master Commit: ace0480feacab917e6c73dd37314994050ab6207 Parents: f2c4343 Author: Jihoon Son <[email protected]> Authored: Wed Sep 2 18:26:47 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Wed Sep 2 18:26:47 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/master/TajoMaster.java | 3 + .../apache/tajo/master/exec/QueryExecutor.java | 4 +- .../java/org/apache/tajo/worker/TajoWorker.java | 3 + .../src/main/resources/python/controller.py | 26 +--- .../plan/exprrewrite/rules/ConstantFolding.java | 4 +- .../function/python/PythonScriptEngine.java | 138 ++++++++++--------- 7 files changed, 87 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 23291e5..e00f295 100644 --- a/CHANGES +++ b/CHANGES @@ -234,6 +234,9 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1802: PythonScriptEngine copies controller and tajo util whenever + it is initialized. (jihoon) + TAJO-1707: Rack local count can be more than actual number of tasks. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index dfe8909..754df7f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -48,6 +48,7 @@ import org.apache.tajo.function.FunctionSignature; import org.apache.tajo.master.rm.TajoResourceManager; import org.apache.tajo.metrics.ClusterResourceMetricSet; import org.apache.tajo.metrics.Master; +import org.apache.tajo.plan.function.python.PythonScriptEngine; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; @@ -203,6 +204,8 @@ public class TajoMaster extends CompositeService { restServer = new TajoRestService(context); addIfService(restServer); + + PythonScriptEngine.initPythonScriptEngineFiles(); // Try to start up all services in TajoMaster. // If anyone is failed, the master prints out the errors and immediately should shutdowns http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index f08a9f6..83b7df4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -54,7 +54,9 @@ import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; import org.apache.tajo.master.exec.prehook.InsertIntoHook; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.Target; -import org.apache.tajo.plan.expr.*; +import org.apache.tajo.plan.expr.EvalContext; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.expr.GeneralFunctionEval; import org.apache.tajo.plan.function.python.PythonScriptEngine; import org.apache.tajo.plan.function.python.TajoScriptEngine; import org.apache.tajo.plan.logical.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 811ba64..65a9511 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -39,6 +39,7 @@ import org.apache.tajo.engine.function.FunctionLoader; import org.apache.tajo.function.FunctionSignature; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.metrics.Node; +import org.apache.tajo.plan.function.python.PythonScriptEngine; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.querymaster.QueryMaster; import org.apache.tajo.querymaster.QueryMasterManagerService; @@ -237,6 +238,8 @@ public class TajoWorker extends CompositeService { historyReader = new HistoryReader(workerContext.getWorkerName(), this.systemConf); FunctionLoader.loadUserDefinedFunctions(systemConf, new HashMap<FunctionSignature, FunctionDesc>()); + + PythonScriptEngine.initPythonScriptEngineFiles(); diagnoseTajoWorker(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-core/src/main/resources/python/controller.py ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/python/controller.py b/tajo-core/src/main/resources/python/controller.py index 8d0f995..e1dc2d3 100644 --- a/tajo-core/src/main/resources/python/controller.py +++ b/tajo-core/src/main/resources/python/controller.py @@ -17,7 +17,6 @@ import sys import os -import logging import base64 import json @@ -28,7 +27,7 @@ try: except ImportError: USE_DATEUTIL = False -from tajo_util import write_user_exception, udf_logging +from tajo_util import write_user_exception FIELD_DELIMITER = ',' TUPLE_START = '(' @@ -95,8 +94,6 @@ class PythonStreamingController: scalar_func = None udaf_instance = None - should_log = False - log_message = logging.info module_name = None output_schema = None @@ -104,8 +101,7 @@ class PythonStreamingController: self.profiling_mode = profiling_mode def main(self, - module_name, file_path, cache_path, - output_stream_path, error_stream_path, log_file_name, output_schema, name, func_type): + module_name, file_path, cache_path, output_schema, name, func_type): sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0) # Need to ensure that user functions can't write to the streams we use to communicate with pig. @@ -118,18 +114,10 @@ class PythonStreamingController: sys.path.append(cache_path) sys.path.append('.') - if self.should_log: - logging.basicConfig(filename=log_file_name, format="%(asctime)s %(levelname)s %(message)s", level=udf_logging.udf_log_level) - logging.info("To reduce the amount of information being logged only a small subset of rows are logged at the " - "INFO level. Call udf_logging.set_log_level_debug in tajo_util to see all rows being processed.") - self.module_name = module_name self.output_schema = output_schema input_str = self.get_next_input() - if udf_logging.udf_log_level == logging.DEBUG: - self.log_message = logging.debug - while input_str != END_OF_STREAM: if func_type == 'UDAF': @@ -171,11 +159,7 @@ class PythonStreamingController: def process_input(self, func_name, func, input_str): try: try: - if self.should_log: - self.log_message("Serialized Input: %s" % (input_str)) inputs = deserialize_input(input_str) - if self.should_log: - self.log_message("Deserialized Input: %s" % (unicode(inputs))) except: # Capture errors where the user passes in bad data. write_user_exception(self.module_name, self.stream_error, NUM_LINES_OFFSET_TRACE) @@ -192,8 +176,6 @@ class PythonStreamingController: func_output = func(*inputs) output = serialize_output(func_output, self.output_schema) - if self.should_log: - self.log_message("Serialized Output: %s" % output) except: # These errors should always be caused by user code. write_user_exception(self.module_name, self.stream_error, NUM_LINES_OFFSET_TRACE) @@ -214,7 +196,6 @@ class PythonStreamingController: def get_next_input(self): input_stream = self.input_stream - # log_stream = self.log_stream input_str = input_stream.readline() @@ -466,5 +447,4 @@ def list_to_str(list_of_item, out_schema): if __name__ == '__main__': controller = PythonStreamingController() - controller.main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], - sys.argv[5], sys.argv[6], sys.argv[7], sys.argv[8], sys.argv[9]) + controller.main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5], sys.argv[6]) http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java index eb546d1..95f4c76 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/exprrewrite/rules/ConstantFolding.java @@ -19,10 +19,10 @@ package org.apache.tajo.plan.exprrewrite.rules; import org.apache.tajo.datum.Datum; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.annotator.Prioritized; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.exprrewrite.EvalTreeOptimizationRule; -import org.apache.tajo.plan.annotator.Prioritized; -import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.function.python.PythonScriptEngine; import org.apache.tajo.plan.function.python.TajoScriptEngine; http://git-wip-us.apache.org/repos/asf/tajo/blob/ace0480f/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 2d93aa9..e202d64 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -253,25 +253,23 @@ public class PythonScriptEngine extends TajoScriptEngine { private static final String PYTHON_LANGUAGE = "python"; - private static final String PYTHON_ROOT_PATH = "/python"; private static final String TAJO_UTIL_NAME = "tajo_util.py"; private static final String CONTROLLER_NAME = "controller.py"; - private static final String PYTHON_CONTROLLER_JAR_PATH = PYTHON_ROOT_PATH + File.separator + CONTROLLER_NAME; // Relative to root of tajo jar. - private static final String PYTHON_TAJO_UTIL_PATH = PYTHON_ROOT_PATH + File.separator + TAJO_UTIL_NAME; // Relative to root of tajo jar. - private static final String DEFAULT_LOG_DIR = "/tmp/tajo-" + System.getProperty("user.name") + "/python"; + private static final String BASE_DIR = FileUtils.getTempDirectoryPath() + "/tajo-" + System.getProperty("user.name") + "/python"; + private static final String PYTHON_CONTROLLER_JAR_PATH = "/python" + File.separator + CONTROLLER_NAME; // Relative to root of tajo jar. + private static final String PYTHON_TAJO_UTIL_JAR_PATH = "/python" + File.separator + TAJO_UTIL_NAME; // Relative to root of tajo jar. // Indexes for arguments being passed to external process - private static final int UDF_LANGUAGE = 0; - private static final int PATH_TO_CONTROLLER_FILE = 1; - private static final int UDF_FILE_NAME = 2; // Name of file where UDF function is defined - private static final int UDF_FILE_PATH = 3; // Path to directory containing file where UDF function is defined - private static final int PATH_TO_FILE_CACHE = 4; // Directory where required files (like tajo_util) are cached on cluster nodes. - private static final int STD_OUT_OUTPUT_PATH = 5; // File for output from when user writes to standard output. - private static final int STD_ERR_OUTPUT_PATH = 6; // File for output from when user writes to standard error. - private static final int CONTROLLER_LOG_FILE_PATH = 7; // Controller log file logs progress through the controller script not user code. - private static final int OUT_SCHEMA = 8; // the schema of the output column - private static final int FUNCTION_OR_CLASS_NAME = 9; // if FUNCTION_TYPE is UDF, function name; if FUNCTION_TYPE is UDAF, class name. - private static final int FUNCTION_TYPE = 10; // UDF or UDAF + enum COMMAND_IDX { + UDF_LANGUAGE, + PATH_TO_CONTROLLER_FILE, + UDF_FILE_NAME, + UDF_FILE_PATH, + PATH_TO_FILE_CACHE, + OUT_SCHEMA, + FUNCTION_OR_CLASS_NAME, + FUNCTION_TYPE, + } private Configuration systemConf; @@ -365,41 +363,26 @@ public class PythonScriptEngine extends TajoScriptEngine { * @throws IOException */ private String[] buildCommand() throws IOException { - String[] command = new String[11]; - - // TODO: support controller logging - String standardOutputRootWriteLocation = systemConf.get(TajoConf.ConfVars.PYTHON_CONTROLLER_LOG_DIR.keyname(), - DEFAULT_LOG_DIR); - if (!standardOutputRootWriteLocation.equals(DEFAULT_LOG_DIR)) { - LOG.warn("Currently, logging is not supported for the python controller."); - } - String controllerLogFileName, outFileName, errOutFileName; + String[] command = new String[8]; String funcName = invocationDesc.getName(); String filePath = invocationDesc.getPath(); - controllerLogFileName = standardOutputRootWriteLocation + funcName + "_controller.log"; - outFileName = standardOutputRootWriteLocation + funcName + ".out"; - errOutFileName = standardOutputRootWriteLocation + funcName + ".err"; - - command[UDF_LANGUAGE] = PYTHON_LANGUAGE; - command[PATH_TO_CONTROLLER_FILE] = getControllerPath(); + command[COMMAND_IDX.UDF_LANGUAGE.ordinal()] = PYTHON_LANGUAGE; + command[COMMAND_IDX.PATH_TO_CONTROLLER_FILE.ordinal()] = getControllerPath(); int lastSeparator = filePath.lastIndexOf(File.separator) + 1; String fileName = filePath.substring(lastSeparator); fileName = fileName.endsWith(FILE_EXTENSION) ? fileName.substring(0, fileName.length()-3) : fileName; - command[UDF_FILE_NAME] = fileName; - command[UDF_FILE_PATH] = lastSeparator <= 0 ? "." : filePath.substring(0, lastSeparator - 1); + command[COMMAND_IDX.UDF_FILE_NAME.ordinal()] = fileName; + command[COMMAND_IDX.UDF_FILE_PATH.ordinal()] = lastSeparator <= 0 ? "." : filePath.substring(0, lastSeparator - 1); String fileCachePath = systemConf.get(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname()); if (fileCachePath == null) { throw new IOException(TajoConf.ConfVars.PYTHON_CODE_DIR.keyname() + " must be set."); } - command[PATH_TO_FILE_CACHE] = "'" + fileCachePath + "'"; - command[STD_OUT_OUTPUT_PATH] = outFileName; - command[STD_ERR_OUTPUT_PATH] = errOutFileName; - command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName; - command[OUT_SCHEMA] = outSchema.getColumn(0).getDataType().getType().name().toLowerCase(); - command[FUNCTION_OR_CLASS_NAME] = funcName; - command[FUNCTION_TYPE] = invocationDesc.isScalarFunction() ? "UDF" : "UDAF"; + command[COMMAND_IDX.PATH_TO_FILE_CACHE.ordinal()] = "'" + fileCachePath + "'"; + command[COMMAND_IDX.OUT_SCHEMA.ordinal()] = outSchema.getColumn(0).getDataType().getType().name().toLowerCase(); + command[COMMAND_IDX.FUNCTION_OR_CLASS_NAME.ordinal()] = funcName; + command[COMMAND_IDX.FUNCTION_TYPE.ordinal()] = invocationDesc.isScalarFunction() ? "UDF" : "UDAF"; return command; } @@ -459,39 +442,48 @@ public class PythonScriptEngine extends TajoScriptEngine { stderr = new DataInputStream(new BufferedInputStream(process.getErrorStream())); } + private static final File pythonScriptBaseDir = new File(PythonScriptEngine.getBaseDirPath()); + private static final File pythonScriptControllerCopy = new File(PythonScriptEngine.getControllerPath()); + private static final File pythonScriptUtilCopy = new File(PythonScriptEngine.getTajoUtilPath()); + + public static void initPythonScriptEngineFiles() throws IOException { + if (!pythonScriptBaseDir.exists()) { + pythonScriptBaseDir.mkdirs(); + } + // Controller and util should be always overwritten. + PythonScriptEngine.loadController(pythonScriptControllerCopy); + PythonScriptEngine.loadTajoUtil(pythonScriptUtilCopy); + } + + public static void loadController(File controllerCopy) throws IOException { + try (InputStream controllerInputStream = PythonScriptEngine.class.getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH)) { + FileUtils.copyInputStreamToFile(controllerInputStream, controllerCopy); + } + } + + public static void loadTajoUtil(File utilCopy) throws IOException { + try (InputStream utilInputStream = PythonScriptEngine.class.getResourceAsStream(PYTHON_TAJO_UTIL_JAR_PATH)) { + FileUtils.copyInputStreamToFile(utilInputStream, utilCopy); + } + } + + public static String getBaseDirPath() { + LOG.info("Python base dir is " + BASE_DIR); + return BASE_DIR; + } + /** * Find the path to the controller file for the streaming language. * - * First check path to job jar and if the file is not found (like in the - * case of running hadoop in standalone mode) write the necessary files - * to temporary files and return that path. - * * @return * @throws IOException */ - private String getControllerPath() throws IOException { - String controllerPath = PYTHON_CONTROLLER_JAR_PATH; - File controller = new File(PYTHON_CONTROLLER_JAR_PATH); - if (!controller.exists()) { - File controllerFile = File.createTempFile("controller", FILE_EXTENSION); - InputStream pythonControllerStream = this.getClass().getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH); - try { - FileUtils.copyInputStreamToFile(pythonControllerStream, controllerFile); - } finally { - pythonControllerStream.close(); - } - controllerFile.deleteOnExit(); - File tajoUtilFile = new File(controllerFile.getParent() + File.separator + TAJO_UTIL_NAME); - tajoUtilFile.deleteOnExit(); - InputStream pythonUtilStream = this.getClass().getResourceAsStream(PYTHON_TAJO_UTIL_PATH); - try { - FileUtils.copyInputStreamToFile(pythonUtilStream, tajoUtilFile); - } finally { - pythonUtilStream.close(); - } - controllerPath = controllerFile.getAbsolutePath(); - } - return controllerPath; + public static String getControllerPath() { + return BASE_DIR + File.separator + CONTROLLER_NAME; + } + + public static String getTajoUtilPath() { + return BASE_DIR + File.separator + TAJO_UTIL_NAME; } /** @@ -511,7 +503,12 @@ public class PythonScriptEngine extends TajoScriptEngine { Datum result = null; try { - result = outputHandler.getNext().asDatum(0); + Tuple next = outputHandler.getNext(); + if (next != null) { + result = next.asDatum(0); + } else { + throw new RuntimeException("Cannot get output result from python controller"); + } } catch (Throwable e) { throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } @@ -652,7 +649,12 @@ public class PythonScriptEngine extends TajoScriptEngine { } Datum result = null; try { - result = outputHandler.getNext().asDatum(0); + Tuple next = outputHandler.getNext(); + if (next != null) { + result = next.asDatum(0); + } else { + throw new RuntimeException("Cannot get output result from python controller"); + } } catch (Exception e) { throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); }
