Repository: hive Updated Branches: refs/heads/master e5ba2690f -> 360dfa0ff
HIVE-13657: Spark driver stderr logs should appear in hive client logs (Mohit Sabharwal, reviewed by Sergio Pena) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/360dfa0f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/360dfa0f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/360dfa0f Branch: refs/heads/master Commit: 360dfa0ffd0d0500db016861ef24299f1596274d Parents: e5ba269 Author: Mohit Sabharwal <[email protected]> Authored: Fri May 20 11:16:43 2016 -0500 Committer: Sergio Pena <[email protected]> Committed: Fri May 20 11:16:43 2016 -0500 ---------------------------------------------------------------------- .../hive/spark/client/SparkClientImpl.java | 35 ++++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/360dfa0f/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index ae78bc3..dfe263f 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -42,6 +42,7 @@ import java.io.Serializable; import java.io.Writer; import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -68,6 +69,7 @@ class SparkClientImpl implements SparkClient { private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class); private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds + private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000; private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS"; private static final String SPARK_HOME_ENV = "SPARK_HOME"; @@ -391,7 +393,6 @@ class SparkClientImpl implements SparkClient { argv.add(numOfExecutors); } } - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { try { String currentUser = Utils.getUGI().getShortUserName(); @@ -445,8 +446,9 @@ class SparkClientImpl implements SparkClient { final Process child = pb.start(); int childId = childIdGenerator.incrementAndGet(); - redirect("stdout-redir-" + childId, child.getInputStream()); - redirect("stderr-redir-" + childId, child.getErrorStream()); + final List<String> childErrorLog = new ArrayList<String>(); + redirect("stdout-redir-" + childId, new Redirector(child.getInputStream())); + redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog)); runnable = new Runnable() { @Override @@ -454,8 +456,15 @@ class SparkClientImpl implements SparkClient { try { int exitCode = child.waitFor(); if (exitCode != 0) { - rpcServer.cancelClient(clientId, "Child process exited before connecting back"); - LOG.warn("Child process exited with code {}.", exitCode); + StringBuilder errStr = new StringBuilder(); + for (String s : childErrorLog) { + errStr.append(s); + errStr.append('\n'); + } + + rpcServer.cancelClient(clientId, + "Child process exited before connecting back with error log " + errStr.toString()); + LOG.warn("Child process exited with code {}", exitCode); } } catch (InterruptedException ie) { LOG.warn("Waiting thread interrupted, killing child process."); @@ -475,8 +484,8 @@ class SparkClientImpl implements SparkClient { return thread; } - private void redirect(String name, InputStream in) { - Thread thread = new Thread(new Redirector(in)); + private void redirect(String name, Redirector redirector) { + Thread thread = new Thread(redirector); thread.setName(name); thread.setDaemon(true); thread.start(); @@ -587,17 +596,29 @@ class SparkClientImpl implements SparkClient { private class Redirector implements Runnable { private final BufferedReader in; + private List<String> errLogs; + private int numErrLogLines = 0; Redirector(InputStream in) { this.in = new BufferedReader(new InputStreamReader(in)); } + Redirector(InputStream in, List<String> errLogs) { + this.in = new BufferedReader(new InputStreamReader(in)); + this.errLogs = errLogs; + } + @Override public void run() { try { String line = null; while ((line = in.readLine()) != null) { LOG.info(line); + if (errLogs != null) { + if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { + errLogs.add(line); + } + } } } catch (Exception e) { LOG.warn("Error in redirector thread.", e);
