HIVE-17078: Add more logs to MapredLocalTask (Yibing Shi via Barna Zsombor Klara)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c1f3d9a4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c1f3d9a4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c1f3d9a4 Branch: refs/heads/hive-14535 Commit: c1f3d9a48b6122b3b5e5bf03ec1e524c5102f3b2 Parents: 593ca11 Author: Barna Zsombor Klara <[email protected]> Authored: Fri Oct 6 15:29:30 2017 +0200 Committer: Barna Zsombor Klara <[email protected]> Committed: Fri Oct 6 15:29:30 2017 +0200 ---------------------------------------------------------------------- .../hadoop/hive/common/log/LogRedirector.java | 99 ++++++++++++++++++++ .../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 24 ++++- .../results/clientpositive/auto_join25.q.out | 4 + .../auto_join_without_localtask.q.out | 2 + .../bucketsortoptimize_insert_8.q.out | 2 + .../infer_bucket_sort_convert_join.q.out | 1 + .../results/clientpositive/mapjoin_hook.q.out | 3 + .../hive/spark/client/SparkClientImpl.java | 63 ++----------- 8 files changed, 138 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java b/common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java new file mode 100644 index 0000000..c0650ed --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.log; + +import org.slf4j.Logger; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import java.util.List; + +/** + * Class used to redirect output read from a stream to a logger + */ +public class LogRedirector implements Runnable { + + private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000; + + public interface LogSourceCallback { + boolean isAlive(); + } + + private final Logger logger; + private final BufferedReader in; + private final LogSourceCallback callback; + private List<String> errLogs; + private int numErrLogLines = 0; + + public LogRedirector(InputStream in, Logger logger, LogSourceCallback callback) { + this.in = new BufferedReader(new InputStreamReader(in)); + this.callback = callback; + this.logger = logger; + } + + public LogRedirector(InputStream in, Logger logger, List<String> errLogs, + LogSourceCallback callback) { + this.in = new BufferedReader(new InputStreamReader(in)); + this.errLogs = errLogs; + this.callback = callback; + this.logger = logger; + } + + @Override + public void run() { + try { + String line = null; + while ((line = in.readLine()) != null) { + logger.info(line); + if (errLogs != null) { + if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { + errLogs.add(line); + } + } + } + } catch (IOException e) { + if (callback.isAlive()) { + logger.warn("I/O error in redirector thread.", e); + } else { + // When stopping the process we are redirecting from, + // the streams might be closed during reading. + // We should not log the related exceptions in a visible level + // as they might mislead the user. + logger.debug("I/O error in redirector thread while stopping the remote driver", e); + } + } catch (Exception e) { + logger.warn("Error in redirector thread.", e); + } + } + + /** + * Start the logredirector in a new thread + * @param name name of the new thread + * @param redirector redirector to start + */ + public static void redirect(String name, LogRedirector redirector) { + Thread thread = new Thread(redirector); + thread.setName(name); + thread.setDaemon(true); + thread.start(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 9dfefee..b6a988d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.io.CachingPrintStream; +import org.apache.hadoop.hive.common.log.LogRedirector; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; @@ -79,6 +80,7 @@ import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.StreamPrinter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -325,6 +327,15 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab // Run ExecDriver in another JVM executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir)); + final LogRedirector.LogSourceCallback callback = () -> {return executor.isAlive();}; + + LogRedirector.redirect( + Thread.currentThread().getName() + "-LocalTask-" + getName() + "-stdout", + new LogRedirector(executor.getInputStream(), LOG, callback)); + LogRedirector.redirect( + Thread.currentThread().getName() + "-LocalTask-" + getName() + "-stderr", + new LogRedirector(executor.getErrorStream(), LOG, callback)); + CachingPrintStream errPrintStream = new CachingPrintStream(System.err); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out); @@ -384,14 +395,19 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: " + Utilities.showTime(elapsed) + " sec."); } catch (Throwable throwable) { + int retVal; + String message; if (throwable instanceof OutOfMemoryError || (throwable instanceof MapJoinMemoryExhaustionError)) { - l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable); - return 3; + message = "Hive Runtime Error: Map local work exhausted memory"; + retVal = 3; } else { - l4j.error("Hive Runtime Error: Map local work failed", throwable); - return 2; + message = "Hive Runtime Error: Map local work failed"; + retVal = 2; } + l4j.error(message, throwable); + console.printError(message, HiveStringUtils.stringifyException(throwable)); + return retVal; } return 0; } http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/test/results/clientpositive/auto_join25.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/auto_join25.q.out b/ql/src/test/results/clientpositive/auto_join25.q.out index 534bdb6..d24e0c3 100644 --- a/ql/src/test/results/clientpositive/auto_join25.q.out +++ b/ql/src/test/results/clientpositive/auto_join25.q.out @@ -18,6 +18,7 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 PREHOOK: Output: default@dest1 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key) @@ -62,8 +63,10 @@ INSERT OVERWRITE TABLE dest_j2 SELECT src1.key, src3.value PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest_j2 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) @@ -105,6 +108,7 @@ INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest_j1 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/test/results/clientpositive/auto_join_without_localtask.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/auto_join_without_localtask.q.out b/ql/src/test/results/clientpositive/auto_join_without_localtask.q.out index 57f0067..a8ae000 100644 --- a/ql/src/test/results/clientpositive/auto_join_without_localtask.q.out +++ b/ql/src/test/results/clientpositive/auto_join_without_localtask.q.out @@ -1045,8 +1045,10 @@ PREHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a. PREHOOK: type: QUERY PREHOOK: Input: default@src #### A masked pattern was here #### +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 order by a.key, a.value limit 40 http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out b/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out index f0e77f0..1b3d741 100644 --- a/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out +++ b/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out @@ -273,6 +273,7 @@ PREHOOK: Input: default@test_table1@ds=1 PREHOOK: Input: default@test_table2 PREHOOK: Input: default@test_table2@ds=1 PREHOOK: Output: default@test_table3@ds=1 +Hive Runtime Error: Map local work failed FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: INSERT OVERWRITE TABLE test_table3 PARTITION (ds = '1') @@ -551,6 +552,7 @@ PREHOOK: Input: default@test_table1@ds=1 PREHOOK: Input: default@test_table2 PREHOOK: Input: default@test_table2@ds=1 PREHOOK: Output: default@test_table3@ds=1 +Hive Runtime Error: Map local work failed FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: INSERT OVERWRITE TABLE test_table3 PARTITION (ds = '1') http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out b/ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out index 70a57d5..09ab2d9 100644 --- a/ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out +++ b/ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out @@ -60,6 +60,7 @@ SELECT a.key, b.value FROM src a JOIN src b ON a.key = b.key PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@test_table@part=1 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: INSERT OVERWRITE TABLE test_table PARTITION (part = '1') http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/test/results/clientpositive/mapjoin_hook.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/mapjoin_hook.q.out b/ql/src/test/results/clientpositive/mapjoin_hook.q.out index a9f9be3..f80a26a 100644 --- a/ql/src/test/results/clientpositive/mapjoin_hook.q.out +++ b/ql/src/test/results/clientpositive/mapjoin_hook.q.out @@ -38,6 +38,7 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 PREHOOK: Output: default@dest1 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask [MapJoinCounter PostHook] COMMON_JOIN: 0 HINTED_MAPJOIN: 0 HINTED_MAPJOIN_LOCAL: 0 CONVERTED_MAPJOIN: 0 CONVERTED_MAPJOIN_LOCAL: 1 BACKUP_COMMON_JOIN: 1 @@ -51,8 +52,10 @@ INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest1 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask [MapJoinCounter PostHook] COMMON_JOIN: 0 HINTED_MAPJOIN: 0 HINTED_MAPJOIN_LOCAL: 0 CONVERTED_MAPJOIN: 0 CONVERTED_MAPJOIN_LOCAL: 2 BACKUP_COMMON_JOIN: 2 http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 03e773a..e0ec3b7 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -32,13 +32,10 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; -import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.Serializable; import java.io.Writer; @@ -53,9 +50,9 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.common.log.LogRedirector; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -75,7 +72,6 @@ class SparkClientImpl implements SparkClient { private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class); private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds - private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000; private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS"; private static final String SPARK_HOME_ENV = "SPARK_HOME"; @@ -490,8 +486,12 @@ class SparkClientImpl implements SparkClient { final Process child = pb.start(); String threadName = Thread.currentThread().getName(); final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>()); - redirect("RemoteDriver-stdout-redir-" + threadName, new Redirector(child.getInputStream())); - redirect("RemoteDriver-stderr-redir-" + threadName, new Redirector(child.getErrorStream(), childErrorLog)); + final LogRedirector.LogSourceCallback callback = () -> {return isAlive;}; + + LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName, + new LogRedirector(child.getInputStream(), LOG, callback)); + LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName, + new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback)); runnable = new Runnable() { @Override @@ -542,13 +542,6 @@ class SparkClientImpl implements SparkClient { return null; } - private void redirect(String name, Redirector redirector) { - Thread thread = new Thread(redirector); - thread.setName(name); - thread.setDaemon(true); - thread.start(); - } - private class ClientProtocol extends BaseProtocol { <T extends Serializable> JobHandleImpl<T> submit(Job<T> job, List<JobHandle.Listener<T>> listeners) { @@ -653,48 +646,6 @@ class SparkClientImpl implements SparkClient { } - private class Redirector implements Runnable { - - private final BufferedReader in; - private List<String> errLogs; - private int numErrLogLines = 0; - - Redirector(InputStream in) { - this.in = new BufferedReader(new InputStreamReader(in)); - } - - Redirector(InputStream in, List<String> errLogs) { - this.in = new BufferedReader(new InputStreamReader(in)); - this.errLogs = errLogs; - } - - @Override - public void run() { - try { - String line = null; - while ((line = in.readLine()) != null) { - LOG.info(line); - if (errLogs != null) { - if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { - errLogs.add(line); - } - } - } - } catch (IOException e) { - if (isAlive) { - LOG.warn("I/O error in redirector thread.", e); - } else { - // When stopping the remote driver the process might be destroyed during reading from the stream. - // We should not log the related exceptions in a visible level as they might mislead the user. - LOG.debug("I/O error in redirector thread while stopping the remote driver", e); - } - } catch (Exception e) { - LOG.warn("Error in redirector thread.", e); - } - } - - } - private static class AddJarJob implements Job<Serializable> { private static final long serialVersionUID = 1L;
