Repository: tajo Updated Branches: refs/heads/master 0b59a93ba -> f21d5d677
TAJO-1596: TestPythonFunctions occasionally fails. Closes #705 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f21d5d67 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f21d5d67 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f21d5d67 Branch: refs/heads/master Commit: f21d5d677c2ba1b9c493498ecb512d1ddcfe2221 Parents: 0b59a93 Author: Jinho Kim <[email protected]> Authored: Fri Aug 21 12:33:29 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Aug 21 12:33:29 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/master/QueryManager.java | 3 +- .../src/main/resources/python/controller.py | 3 - .../function/python/PythonScriptEngine.java | 97 ++++++++++++++------ 4 files changed, 75 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 2e93f51..8653bbc 100644 --- a/CHANGES +++ b/CHANGES @@ -232,6 +232,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1596: TestPythonFunctions occasionally fails. (jinho) + TAJO-1741: Two tables having same time zone display different timestamps. (Contributed Jongyoung Park, committed by hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 8838986..95562ed 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -226,8 +226,9 @@ public class QueryManager extends CompositeService { public boolean startQueryJob(QueryId queryId, AllocationResourceProto allocation) { if (submittedQueries.get(queryId).allocateToQueryMaster(allocation)) { - QueryInProgress queryInProgress = submittedQueries.remove(queryId); + QueryInProgress queryInProgress = submittedQueries.get(queryId); runningQueries.put(queryInProgress.getQueryId(), queryInProgress); + submittedQueries.remove(queryId); dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInProgress.getQueryInfo())); return true; http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/tajo-core/src/main/resources/python/controller.py ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/python/controller.py b/tajo-core/src/main/resources/python/controller.py index 126ccdc..8d0f995 100644 --- a/tajo-core/src/main/resources/python/controller.py +++ b/tajo-core/src/main/resources/python/controller.py @@ -113,9 +113,6 @@ class PythonStreamingController: self.stream_error = os.fdopen(sys.stderr.fileno(), 'wb', 0) self.input_stream = sys.stdin - # TODO: support controller logging - self.log_stream = open(output_stream_path, 'a') - sys.stderr = open(error_stream_path, 'w') sys.path.append(file_path) sys.path.append(cache_path) http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 02dff19..2d93aa9 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -19,6 +19,7 @@ package org.apache.tajo.plan.function.python; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -27,12 +28,17 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; -import org.apache.tajo.function.*; +import org.apache.tajo.function.FunctionInvocation; +import org.apache.tajo.function.FunctionSignature; +import org.apache.tajo.function.FunctionSupplement; +import org.apache.tajo.function.PythonInvocationDesc; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext; import org.apache.tajo.plan.function.stream.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -323,12 +329,26 @@ public class PythonScriptEngine extends TajoScriptEngine { @Override public void shutdown() { - process.destroy(); FileUtil.cleanup(LOG, stdin, stdout, stderr, inputHandler, outputHandler); stdin = null; stdout = stderr = null; inputHandler = null; outputHandler = null; + + try { + int exitCode = process.waitFor(); + + if (systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + LOG.warn("Process exit code: " + exitCode); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Process exit code: " + exitCode); + } + } + } catch (InterruptedException e) { + LOG.warn(e.getMessage(), e); + } + if (LOG.isDebugEnabled()) { LOG.debug("PythonScriptExecutor shuts down"); } @@ -485,14 +505,15 @@ public class PythonScriptEngine extends TajoScriptEngine { try { inputHandler.putNext(input, inSchema); stdin.flush(); - } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue", e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e)); } - Datum result; + + Datum result = null; try { result = outputHandler.getNext().asDatum(0); - } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } return result; @@ -519,14 +540,36 @@ public class PythonScriptEngine extends TajoScriptEngine { try { inputHandler.putNext(methodName, input, inSchema); stdin.flush(); - } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue while executing " + methodName + " with " + input, e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Failed adding input to inputQueue while executing " + + methodName + " with " + input, e)); } try { outputHandler.getNext(); } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); + } + } + + /** + * Get the standard error streams of the external process and throw the exception + * + * @throws RuntimeException + */ + private void throwException(InputStream stderr, RuntimeException e) throws RuntimeException { + try { + if (stderr.available() > 0) { + byte[] bytes = new byte[Math.min(stderr.available(), 100 * StorageUnit.KB)]; + IOUtils.readFully(stderr, bytes); + String message = new String(bytes, Charset.defaultCharset()); + + throw new RuntimeException("Python exception caused by: " + message, e); + } else { + throw e; + } + } catch (IOException ioe) { + throw new RuntimeException(ioe.getMessage(), ioe); } } @@ -540,13 +583,13 @@ public class PythonScriptEngine extends TajoScriptEngine { try { inputHandler.putNext("update_context", functionContext); stdin.flush(); - } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue", e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e)); } try { outputHandler.getNext(); - } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } } @@ -560,13 +603,13 @@ public class PythonScriptEngine extends TajoScriptEngine { try { inputHandler.putNext("get_context", EMPTY_INPUT, EMPTY_SCHEMA); stdin.flush(); - } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue", e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e)); } try { outputHandler.getNext(functionContext); - } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } } @@ -581,14 +624,16 @@ public class PythonScriptEngine extends TajoScriptEngine { try { inputHandler.putNext("get_partial_result", EMPTY_INPUT, EMPTY_SCHEMA); stdin.flush(); - } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue", e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e)); } + String result = null; try { - return outputHandler.getPartialResultString(); - } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + result = outputHandler.getPartialResultString(); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } + return result; } /** @@ -603,13 +648,13 @@ public class PythonScriptEngine extends TajoScriptEngine { inputHandler.putNext("get_final_result", EMPTY_INPUT, EMPTY_SCHEMA); stdin.flush(); } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue", e); + throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e)); } - Datum result; + Datum result = null; try { result = outputHandler.getNext().asDatum(0); } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } return result;
