HIVE-15473: Progress Bar on Beeline client (Anishek Agarwal via Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3e01ef32 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3e01ef32 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3e01ef32 Branch: refs/heads/hive-14535 Commit: 3e01ef3268ffbcb69c5c18c2c9f8810512c91bf8 Parents: f6cdbc8 Author: Anishek Agarwal <[email protected]> Authored: Fri Jan 6 14:31:21 2017 +0530 Committer: Thejas M Nair <[email protected]> Committed: Tue Feb 7 12:12:27 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/hive/beeline/Commands.java | 84 +- .../logs/BeelineInPlaceUpdateStream.java | 66 ++ common/pom.xml | 5 + .../hadoop/hive/common/log/InPlaceUpdate.java | 202 ++++ .../hadoop/hive/common/log/ProgressMonitor.java | 51 + .../org/apache/hadoop/hive/conf/HiveConf.java | 7 +- .../TestOperationLoggingAPIWithMr.java | 2 +- .../TestOperationLoggingAPIWithTez.java | 2 +- .../org/apache/hive/jdbc/HiveStatement.java | 13 + .../hive/jdbc/logs/InPlaceUpdateStream.java | 14 + ql/pom.xml | 5 - .../hadoop/hive/ql/exec/InPlaceUpdates.java | 89 -- .../hive/ql/exec/SerializationUtilities.java | 1 - .../ql/exec/spark/status/SparkJobMonitor.java | 6 +- .../hive/ql/exec/tez/TezJobExecHelper.java | 5 +- .../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 1016 ----------------- .../hive/ql/exec/tez/TezSessionState.java | 8 +- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 6 +- .../hive/ql/exec/tez/monitoring/Constants.java | 7 + .../hive/ql/exec/tez/monitoring/DAGSummary.java | 197 ++++ .../exec/tez/monitoring/FSCountersSummary.java | 92 ++ .../ql/exec/tez/monitoring/LLAPioSummary.java | 108 ++ .../ql/exec/tez/monitoring/PrintSummary.java | 7 + .../QueryExecutionBreakdownSummary.java | 75 ++ .../ql/exec/tez/monitoring/TezJobMonitor.java | 397 +++++++ .../exec/tez/monitoring/TezProgressMonitor.java | 313 ++++++ .../apache/hadoop/hive/ql/metadata/Hive.java | 11 +- .../hadoop/hive/ql/session/SessionState.java | 12 + .../tez/monitoring/TestTezProgressMonitor.java | 101 ++ service-rpc/if/TCLIService.thrift | 26 +- .../gen/thrift/gen-cpp/TCLIService_types.cpp | 322 ++++++ .../src/gen/thrift/gen-cpp/TCLIService_types.h | 102 +- .../rpc/thrift/TGetOperationStatusReq.java | 109 +- .../rpc/thrift/TGetOperationStatusResp.java | 116 +- .../service/rpc/thrift/TJobExecutionStatus.java | 48 + .../service/rpc/thrift/TProgressUpdateResp.java | 1033 ++++++++++++++++++ service-rpc/src/gen/thrift/gen-php/Types.php | 327 ++++++ .../src/gen/thrift/gen-py/TCLIService/ttypes.py | 214 +++- .../gen/thrift/gen-rb/t_c_l_i_service_types.rb | 51 +- .../org/apache/hive/service/cli/CLIService.java | 63 +- .../service/cli/EmbeddedCLIServiceClient.java | 4 +- .../apache/hive/service/cli/ICLIService.java | 2 +- .../hive/service/cli/JobProgressUpdate.java | 38 + .../hive/service/cli/OperationStatus.java | 8 + .../cli/ProgressMonitorStatusMapper.java | 19 + .../cli/TezProgressMonitorStatusMapper.java | 32 + .../thrift/RetryingThriftCLIServiceClient.java | 5 +- .../service/cli/thrift/ThriftCLIService.java | 28 +- .../cli/thrift/ThriftCLIServiceClient.java | 3 +- .../apache/hive/service/cli/CLIServiceTest.java | 18 +- .../cli/TestRetryingThriftCLIServiceClient.java | 2 +- .../cli/thrift/ThriftCLIServiceTest.java | 8 +- .../thrift/ThriftCliServiceTestWithCookie.java | 2 +- 53 files changed, 4268 insertions(+), 1214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/beeline/src/java/org/apache/hive/beeline/Commands.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java index 748546d..99db643 100644 --- a/beeline/src/java/org/apache/hive/beeline/Commands.java +++ b/beeline/src/java/org/apache/hive/beeline/Commands.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.SystemVariables; import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.io.IOUtils; +import org.apache.hive.beeline.logs.BeelineInPlaceUpdateStream; import org.apache.hive.jdbc.HiveStatement; import org.apache.hive.jdbc.Utils; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; @@ -982,6 +983,11 @@ public class Commands { logThread = new Thread(createLogRunnable(stmnt)); logThread.setDaemon(true); logThread.start(); + if (stmnt instanceof HiveStatement) { + ((HiveStatement) stmnt).setInPlaceUpdateStream( + new BeelineInPlaceUpdateStream(beeLine.getOutputStream()) + ); + } hasResults = stmnt.execute(sql); logThread.interrupt(); logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); @@ -1242,43 +1248,65 @@ public class Commands { command.setLength(0); } - private Runnable createLogRunnable(Statement statement) { + private Runnable createLogRunnable(final Statement statement) { if (statement instanceof HiveStatement) { - final HiveStatement hiveStatement = (HiveStatement) statement; - - Runnable runnable = new Runnable() { - @Override - public void run() { - while (hiveStatement.hasMoreLogs()) { - try { - // fetch the log periodically and output to beeline console - for (String log : hiveStatement.getQueryLog()) { - beeLine.info(log); - } - Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL); - } catch (SQLException e) { - beeLine.error(new SQLWarning(e)); - return; - } catch (InterruptedException e) { - beeLine.debug("Getting log thread is interrupted, since query is done!"); - showRemainingLogsIfAny(hiveStatement); - return; - } - } - } - }; - return runnable; + return new LogRunnable(this, (HiveStatement) statement, + DEFAULT_QUERY_PROGRESS_INTERVAL); } else { - beeLine.debug("The statement instance is not HiveStatement type: " + statement.getClass()); + beeLine.debug( + "The statement instance is not HiveStatement type: " + statement + .getClass()); return new Runnable() { - @Override - public void run() { + @Override public void run() { // do nothing. } }; } } + private void error(Throwable throwable) { + beeLine.error(throwable); + } + + private void debug(String message) { + beeLine.debug(message); + } + + + + static class LogRunnable implements Runnable { + private final Commands commands; + private final HiveStatement hiveStatement; + private final long queryProgressInterval; + + LogRunnable(Commands commands, HiveStatement hiveStatement, + long queryProgressInterval) { + this.hiveStatement = hiveStatement; + this.commands = commands; + this.queryProgressInterval = queryProgressInterval; + } + + private void updateQueryLog() throws SQLException { + for (String log : hiveStatement.getQueryLog()) { + commands.beeLine.info(log); + } + } + + @Override public void run() { + while (hiveStatement.hasMoreLogs()) { + try { + updateQueryLog(); + Thread.sleep(queryProgressInterval); + } catch (SQLException e) { + commands.error(new SQLWarning(e)); + } catch (InterruptedException e) { + commands.debug("Getting log thread is interrupted, since query is done!"); + commands.showRemainingLogsIfAny(hiveStatement); + } + } + } + } + private void showRemainingLogsIfAny(Statement statement) { if (statement instanceof HiveStatement) { HiveStatement hiveStatement = (HiveStatement) statement; http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java new file mode 100644 index 0000000..2ed289c --- /dev/null +++ b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java @@ -0,0 +1,66 @@ +package org.apache.hive.beeline.logs; + +import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.common.log.ProgressMonitor; +import org.apache.hive.jdbc.logs.InPlaceUpdateStream; +import org.apache.hive.service.rpc.thrift.TJobExecutionStatus; +import org.apache.hive.service.rpc.thrift.TProgressUpdateResp; + +import java.io.PrintStream; +import java.util.List; + +public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream { + private InPlaceUpdate inPlaceUpdate; + + public BeelineInPlaceUpdateStream(PrintStream out) { + this.inPlaceUpdate = new InPlaceUpdate(out); + } + + @Override + public void update(TProgressUpdateResp response) { + if (response == null || response.getStatus().equals(TJobExecutionStatus.NOT_AVAILABLE)) + return; + + inPlaceUpdate.render(new ProgressMonitorWrapper(response)); + } + + static class ProgressMonitorWrapper implements ProgressMonitor { + private TProgressUpdateResp response; + + ProgressMonitorWrapper(TProgressUpdateResp response) { + this.response = response; + } + + @Override + public List<String> headers() { + return response.getHeaderNames(); + } + + @Override + public List<List<String>> rows() { + return response.getRows(); + } + + @Override + public String footerSummary() { + return response.getFooterSummary(); + } + + @Override + public long startTime() { + return response.getStartTime(); + } + + @Override + public String executionStatus() { + throw new UnsupportedOperationException( + "This should never be used for anything. All the required data is available via other methods" + ); + } + + @Override + public double progressedPercentage() { + return response.getProgressedPercentage(); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index fd948f8..8474a87 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -64,6 +64,11 @@ <artifactId>orc-core</artifactId> </dependency> <dependency> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + <version>${jline.version}</version> + </dependency> + <dependency> <groupId>org.eclipse.jetty.aggregate</groupId> <artifactId>jetty-all</artifactId> <version>${jetty.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java new file mode 100644 index 0000000..bfdb4fa --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java @@ -0,0 +1,202 @@ +package org.apache.hadoop.hive.common.log; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import jline.TerminalFactory; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.fusesource.jansi.Ansi; + +import javax.annotation.Nullable; +import java.io.PrintStream; +import java.io.StringWriter; +import java.text.DecimalFormat; +import java.util.List; + +import static org.fusesource.jansi.Ansi.ansi; +import static org.fusesource.jansi.internal.CLibrary.*; + +/** + * Renders information from ProgressMonitor to the stream provided. + */ +public class InPlaceUpdate { + + public static final int MIN_TERMINAL_WIDTH = 94; + + // keep this within 80 chars width. If more columns needs to be added then update min terminal + // width requirement and SEPARATOR width accordingly + private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s %6s "; + private static final String VERTEX_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s %6s "; + private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s"; + + private static final int PROGRESS_BAR_CHARS = 30; + private static final String SEPARATOR = new String(new char[MIN_TERMINAL_WIDTH]).replace("\0", "-"); + + /* Pretty print the values */ + private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00"); + private int lines = 0; + private PrintStream out; + + public InPlaceUpdate(PrintStream out) { + this.out = out; + } + + public InPlaceUpdate() { + this(System.out); + } + + public static void reprintLine(PrintStream out, String line) { + out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); + out.flush(); + } + + public static void rePositionCursor(PrintStream ps) { + ps.print(ansi().cursorUp(0).toString()); + ps.flush(); + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Erases the current line and prints the given line. + * + * @param line - line to print + */ + private void reprintLine(String line) { + reprintLine(out, line); + lines++; + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Erases the current line and prints the given line with the specified color. + * + * @param line - line to print + * @param color - color for the line + */ + private void reprintLineWithColorAsBold(String line, Ansi.Color color) { + out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset() + .toString()); + out.flush(); + lines++; + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Erases the current line and prints the given multiline. Make sure the specified line is not + * terminated by linebreak. + * + * @param line - line to print + */ + private void reprintMultiLine(String line) { + int numLines = line.split("\r\n|\r|\n").length; + out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); + out.flush(); + lines += numLines; + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Repositions the cursor back to line 0. + */ + private void repositionCursor() { + if (lines > 0) { + out.print(ansi().cursorUp(lines).toString()); + out.flush(); + lines = 0; + } + } + + + // [==================>>-----] + private String getInPlaceProgressBar(double percent) { + StringWriter bar = new StringWriter(); + bar.append("["); + int remainingChars = PROGRESS_BAR_CHARS - 4; + int completed = (int) (remainingChars * percent); + int pending = remainingChars - completed; + for (int i = 0; i < completed; i++) { + bar.append("="); + } + bar.append(">>"); + for (int i = 0; i < pending; i++) { + bar.append("-"); + } + bar.append("]"); + return bar.toString(); + } + + public void render(ProgressMonitor monitor) { + if (monitor == null) return; + // position the cursor to line 0 + repositionCursor(); + + // print header + // ------------------------------------------------------------------------------- + // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED + // ------------------------------------------------------------------------------- + reprintLine(SEPARATOR); + reprintLineWithColorAsBold(String.format(HEADER_FORMAT, monitor.headers().toArray()), + Ansi.Color.CYAN); + reprintLine(SEPARATOR); + + + // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0 + List<String> printReady = Lists.transform(monitor.rows(), new Function<List<String>, String>() { + @Nullable + @Override + public String apply(@Nullable List<String> row) { + return String.format(VERTEX_FORMAT, row.toArray()); + } + }); + reprintMultiLine(StringUtils.join(printReady, "\n")); + + // ------------------------------------------------------------------------------- + // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s + // ------------------------------------------------------------------------------- + String progressStr = "" + (int) (monitor.progressedPercentage() * 100) + "%"; + float et = (float) (System.currentTimeMillis() - monitor.startTime()) / (float) 1000; + String elapsedTime = "ELAPSED TIME: " + secondsFormatter.format(et) + " s"; + String footer = String.format( + FOOTER_FORMAT, + monitor.footerSummary(), + getInPlaceProgressBar(monitor.progressedPercentage()), + progressStr, + elapsedTime); + + reprintLineWithColorAsBold(footer, Ansi.Color.RED); + reprintLine(SEPARATOR); + } + + + public static boolean canRenderInPlace(HiveConf conf) { + boolean inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS); + + // we need at least 80 chars wide terminal to display in-place updates properly + return inPlaceUpdates && isUnixTerminal() && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH; + } + + private static boolean isUnixTerminal() { + + String os = System.getProperty("os.name"); + if (os.startsWith("Windows")) { + // we do not support Windows, we will revisit this if we really need it for windows. + return false; + } + + // We must be on some unix variant.. + // check if standard out is a terminal + try { + // isatty system call will return 1 if the file descriptor is terminal else 0 + if (isatty(STDOUT_FILENO) == 0) { + return false; + } + if (isatty(STDERR_FILENO) == 0) { + return false; + } + } catch (NoClassDefFoundError | UnsatisfiedLinkError ignore) { + // These errors happen if the JNI lib is not available for your platform. + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java new file mode 100644 index 0000000..ee02ccb --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java @@ -0,0 +1,51 @@ +package org.apache.hadoop.hive.common.log; + +import java.util.Collections; +import java.util.List; + +public interface ProgressMonitor { + + ProgressMonitor NULL = new ProgressMonitor() { + @Override + public List<String> headers() { + return Collections.emptyList(); + } + + @Override + public List<List<String>> rows() { + return Collections.emptyList(); + } + + @Override + public String footerSummary() { + return ""; + } + + @Override + public long startTime() { + return 0; + } + + @Override + public String executionStatus() { + return ""; + } + + @Override + public double progressedPercentage() { + return 0; + } + }; + + List<String> headers(); + + List<List<String>> rows(); + + String footerSummary(); + + long startTime(); + + String executionStatus(); + + double progressedPercentage(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cb27cd6..f3b01b2 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2841,7 +2841,12 @@ public class HiveConf extends Configuration { TEZ_EXEC_INPLACE_PROGRESS( "hive.tez.exec.inplace.progress", true, - "Updates tez job execution progress in-place in the terminal."), + "Updates tez job execution progress in-place in the terminal when hive-cli is used."), + HIVE_SERVER2_INPLACE_PROGRESS( + "hive.server2.in.place.progress", + true, + "Allows hive server 2 to send progress bar update information. This is currently available" + + " only if the execution engine is tez."), SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true, "Updates spark job execution progress in-place in the terminal."), TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f, http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java index b8462c6..830ffc2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java @@ -97,7 +97,7 @@ public class TestOperationLoggingAPIWithMr extends OperationLoggingAPITestBase { if (System.currentTimeMillis() > pollTimeout) { break; } - opStatus = client.getOperationStatus(operationHandle); + opStatus = client.getOperationStatus(operationHandle, false); Assert.assertNotNull(opStatus); state = opStatus.getState(); http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java index 8b5b516..e98406d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java @@ -50,7 +50,7 @@ public class TestOperationLoggingAPIWithTez extends OperationLoggingAPITestBase "<PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>", "<PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>", "<PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>", - "from=org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor", + "from=org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor", "org.apache.tez.common.counters.DAGCounter", "NUM_SUCCEEDED_TASKS", "TOTAL_LAUNCHED_TASKS", http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index a242501..56860c4 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -19,6 +19,7 @@ package org.apache.hive.jdbc; import org.apache.commons.codec.binary.Base64; +import org.apache.hive.jdbc.logs.InPlaceUpdateStream; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.rpc.thrift.TCLIService; @@ -114,6 +115,8 @@ public class HiveStatement implements java.sql.Statement { private int queryTimeout = 0; + private InPlaceUpdateStream inPlaceUpdateStream = InPlaceUpdateStream.NO_OP; + public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle) { this(connection, client, sessHandle, false, DEFAULT_FETCH_SIZE); @@ -342,6 +345,7 @@ public class HiveStatement implements java.sql.Statement { TGetOperationStatusResp waitForOperationToComplete() throws SQLException { TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); + statusReq.setGetProgressUpdate(inPlaceUpdateStream != InPlaceUpdateStream.NO_OP); TGetOperationStatusResp statusResp = null; // Poll on the operation status, till the operation is complete @@ -352,6 +356,7 @@ public class HiveStatement implements java.sql.Statement { * essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires */ statusResp = client.GetOperationStatus(statusReq); + inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse()); Utils.verifySuccessWithInfo(statusResp.getStatus()); if (statusResp.isSetOperationState()) { switch (statusResp.getOperationState()) { @@ -951,4 +956,12 @@ public class HiveStatement implements java.sql.Statement { } return null; } + + /** + * This is only used by the beeline client to set the stream on which in place progress updates + * are to be shown + */ + public void setInPlaceUpdateStream(InPlaceUpdateStream stream) { + this.inPlaceUpdateStream = stream; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java new file mode 100644 index 0000000..3a682b2 --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java @@ -0,0 +1,14 @@ +package org.apache.hive.jdbc.logs; + +import org.apache.hive.service.rpc.thrift.TProgressUpdateResp; + +public interface InPlaceUpdateStream { + void update(TProgressUpdateResp response); + + InPlaceUpdateStream NO_OP = new InPlaceUpdateStream() { + @Override + public void update(TProgressUpdateResp response) { + + } + }; +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/pom.xml ---------------------------------------------------------------------- diff --git a/ql/pom.xml b/ql/pom.xml index 84e83ee..1e6ba9a 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -463,11 +463,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>jline</groupId> - <artifactId>jline</artifactId> - <version>${jline.version}</version> - </dependency> - <dependency> <groupId>org.apache.tez</groupId> <artifactId>tez-api</artifactId> <version>${tez.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java deleted file mode 100644 index f59d8e2..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec; - -import static org.fusesource.jansi.Ansi.ansi; -import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO; -import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO; -import static org.fusesource.jansi.internal.CLibrary.isatty; - -import java.io.PrintStream; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.fusesource.jansi.Ansi; - -import jline.TerminalFactory; - -public class InPlaceUpdates { - - public static final int MIN_TERMINAL_WIDTH = 94; - - static boolean isUnixTerminal() { - - String os = System.getProperty("os.name"); - if (os.startsWith("Windows")) { - // we do not support Windows, we will revisit this if we really need it for windows. - return false; - } - - // We must be on some unix variant.. - // check if standard out is a terminal - try { - // isatty system call will return 1 if the file descriptor is terminal else 0 - if (isatty(STDOUT_FILENO) == 0) { - return false; - } - if (isatty(STDERR_FILENO) == 0) { - return false; - } - } catch (NoClassDefFoundError ignore) { - // These errors happen if the JNI lib is not available for your platform. - return false; - } catch (UnsatisfiedLinkError ignore) { - // These errors happen if the JNI lib is not available for your platform. - return false; - } - return true; - } - - public static boolean inPlaceEligible(HiveConf conf) { - String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); - boolean inPlaceUpdates = false; - if (engine.equals("tez")) { - inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS); - } - if (engine.equals("spark")) { - inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.SPARK_EXEC_INPLACE_PROGRESS); - } - - // we need at least 80 chars wide terminal to display in-place updates properly - return inPlaceUpdates && !SessionState.getConsole().getIsSilent() && isUnixTerminal() - && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH; - } - - public static void reprintLine(PrintStream out, String line) { - out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); - out.flush(); - } - - public static void rePositionCursor(PrintStream ps) { - ps.print(ansi().cursorUp(0).toString()); - ps.flush(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index 7be628e..247d589 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -38,7 +38,6 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor; import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index d5b9b5d..cf0162d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.InPlaceUpdates; +import org.apache.hadoop.hive.common.log.InPlaceUpdate; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState; import org.fusesource.jansi.Ansi; @@ -82,7 +82,7 @@ abstract class SparkJobMonitor { protected SparkJobMonitor(HiveConf hiveConf) { monitorTimeoutInterval = hiveConf.getTimeVar( HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS); - inPlaceUpdate = InPlaceUpdates.inPlaceEligible(hiveConf); + inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent(); console = SessionState.getConsole(); out = SessionState.LogHelper.getInfoStream(); } @@ -270,7 +270,7 @@ abstract class SparkJobMonitor { } private void reprintLine(String line) { - InPlaceUpdates.reprintLine(out, line); + InPlaceUpdate.reprintLine(out, line); lines++; } http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java index a3fc815..a544b93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java @@ -18,10 +18,11 @@ package org.apache.hadoop.hive.ql.exec.tez; -import java.lang.reflect.Method; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Method; + /** * TezJobExecHelper is a utility to safely call Tez functionality from * common code paths. It will check if tez is available/installed before @@ -37,7 +38,7 @@ public class TezJobExecHelper { // we have tez installed ClassLoader classLoader = TezJobExecHelper.class.getClassLoader(); - Method method = classLoader.loadClass("org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor") + Method method = classLoader.loadClass("org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor") .getMethod("killRunningJobs"); method.invoke(null, null); } http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java deleted file mode 100644 index bd935d4..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ /dev/null @@ -1,1016 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.tez; - -import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; -import static org.fusesource.jansi.Ansi.ansi; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.PrintStream; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.counters.LlapIOCounters; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.InPlaceUpdates; -import org.apache.hadoop.hive.ql.exec.MapOperator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hive.common.util.ShutdownHookManager; -import org.apache.tez.common.counters.FileSystemCounter; -import org.apache.tez.common.counters.TaskCounter; -import org.apache.tez.common.counters.TezCounter; -import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.Vertex; -import org.apache.tez.dag.api.client.DAGClient; -import org.apache.tez.dag.api.client.DAGStatus; -import org.apache.tez.dag.api.client.Progress; -import org.apache.tez.dag.api.client.StatusGetOpts; -import org.apache.tez.dag.api.client.VertexStatus; -import org.fusesource.jansi.Ansi; - -import com.google.common.base.Preconditions; - -/** - * TezJobMonitor keeps track of a tez job while it's being executed. It will - * print status to the console and retrieve final status of the job after - * completion. - */ -public class TezJobMonitor { - - private static final String CLASS_NAME = TezJobMonitor.class.getName(); - - private static final int COLUMN_1_WIDTH = 16; - private static final int SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH; - private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH + 34; - private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0", "-"); - private static final String FILE_HEADER_SEPARATOR = - new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-"); - private static final String QUERY_EXEC_SUMMARY_HEADER = "Query Execution Summary"; - private static final String TASK_SUMMARY_HEADER = "Task Execution Summary"; - private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary"; - private static final String FS_COUNTERS_SUMMARY_HEADER = "FileSystem Counters Summary"; - - // keep this within 80 chars width. If more columns needs to be added then update min terminal - // width requirement and SEPARATOR width accordingly - private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s %6s "; - private static final String VERTEX_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s %6s "; - private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s"; - private static final String HEADER = String.format(HEADER_FORMAT, - "VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED"); - - // method and dag summary format - private static final String SUMMARY_HEADER_FORMAT = "%10s %14s %13s %12s %14s %15s"; - private static final String SUMMARY_HEADER = String.format(SUMMARY_HEADER_FORMAT, - "VERTICES", "DURATION(ms)", "CPU_TIME(ms)", "GC_TIME(ms)", "INPUT_RECORDS", "OUTPUT_RECORDS"); - - // used when I/O redirection is used - private static final String FILE_HEADER_FORMAT = "%10s %12s %16s %13s %14s %13s %12s %14s %15s"; - private static final String FILE_HEADER = String.format(FILE_HEADER_FORMAT, - "VERTICES", "TOTAL_TASKS", "FAILED_ATTEMPTS", "KILLED_TASKS", "DURATION(ms)", - "CPU_TIME(ms)", "GC_TIME(ms)", "INPUT_RECORDS", "OUTPUT_RECORDS"); - - // LLAP counters - private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %9s %9s %10s %9s %10s %11s %8s %9s"; - private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT, - "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS", - "ALLOCATION", "USED", "TOTAL_IO"); - - // FileSystem counters - private static final String FS_COUNTERS_HEADER_FORMAT = "%10s %15s %13s %18s %18s %13s"; - - // Methods summary - private static final String OPERATION_SUMMARY = "%-35s %9s"; - private static final String OPERATION = "OPERATION"; - private static final String DURATION = "DURATION"; - - // in-place progress update related variables - private int lines; - private final PrintStream out; - - private transient LogHelper console; - private final PerfLogger perfLogger = SessionState.getPerfLogger(); - private final int checkInterval = 200; - private final int maxRetryInterval = 2500; - private final int printInterval = 3000; - private final int progressBarChars = 30; - private long lastPrintTime; - private Set<String> completed; - - /* Pretty print the values */ - private final NumberFormat secondsFormat; - private final NumberFormat commaFormat; - private static final List<DAGClient> shutdownList; - private final Map<String, BaseWork> workMap; - - private StringBuffer diagnostics; - - static { - shutdownList = new LinkedList<DAGClient>(); - ShutdownHookManager.addShutdownHook(new Runnable() { - @Override - public void run() { - TezJobMonitor.killRunningJobs(); - try { - TezSessionPoolManager.getInstance().closeNonDefaultSessions(false); - } catch (Exception e) { - // ignore - } - } - }); - } - - public static void initShutdownHook() { - Preconditions.checkNotNull(shutdownList, - "Shutdown hook was not properly initialized"); - } - - public TezJobMonitor(Map<String, BaseWork> workMap) { - this.workMap = workMap; - console = SessionState.getConsole(); - secondsFormat = new DecimalFormat("#0.00"); - commaFormat = NumberFormat.getNumberInstance(Locale.US); - // all progress updates are written to info stream and log file. In-place updates can only be - // done to info stream (console) - out = console.getInfoStream(); - } - - /** - * NOTE: Use this method only if isUnixTerminal is true. - * Erases the current line and prints the given line. - * @param line - line to print - */ - public void reprintLine(String line) { - InPlaceUpdates.reprintLine(out, line); - lines++; - } - - /** - * NOTE: Use this method only if isUnixTerminal is true. - * Erases the current line and prints the given line with the specified color. - * @param line - line to print - * @param color - color for the line - */ - public void reprintLineWithColorAsBold(String line, Ansi.Color color) { - out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset() - .toString()); - out.flush(); - lines++; - } - - /** - * NOTE: Use this method only if isUnixTerminal is true. - * Erases the current line and prints the given multiline. Make sure the specified line is not - * terminated by linebreak. - * @param line - line to print - */ - public void reprintMultiLine(String line) { - int numLines = line.split("\r\n|\r|\n").length; - out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); - out.flush(); - lines += numLines; - } - - /** - * NOTE: Use this method only if isUnixTerminal is true. - * Repositions the cursor back to line 0. - */ - public void repositionCursor() { - if (lines > 0) { - out.print(ansi().cursorUp(lines).toString()); - out.flush(); - lines = 0; - } - } - - /** - * monitorExecution handles status printing, failures during execution and final status retrieval. - * - * @param dagClient client that was used to kick off the job - * @param conf configuration file for this operation - * @return int 0 - success, 1 - killed, 2 - failed - */ - public int monitorExecution(final DAGClient dagClient, HiveConf conf, - DAG dag, Context ctx) throws InterruptedException { - long monitorStartTime = System.currentTimeMillis(); - DAGStatus status = null; - completed = new HashSet<String>(); - diagnostics = new StringBuffer(); - - boolean running = false; - boolean done = false; - boolean success = false; - int failedCounter = 0; - int rc = 0; - DAGStatus.State lastState = null; - String lastReport = null; - Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>(); - long startTime = 0; - boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || - Utilities.isPerfOrAboveLogging(conf); - boolean llapIoEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_IO_ENABLED, false); - - boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf); - synchronized(shutdownList) { - shutdownList.add(dagClient); - } - console.printInfo("\n"); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); - Map<String, Progress> progressMap = null; - while (true) { - - try { - if (ctx != null) { - ctx.checkHeartbeaterLockException(); - } - - status = dagClient.getDAGStatus(opts, checkInterval); - progressMap = status.getVertexProgress(); - DAGStatus.State state = status.getState(); - - if (state != lastState || state == RUNNING) { - lastState = state; - - switch (state) { - case SUBMITTED: - console.printInfo("Status: Submitted"); - break; - case INITING: - console.printInfo("Status: Initializing"); - startTime = System.currentTimeMillis(); - break; - case RUNNING: - if (!running) { - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); - console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n"); - startTime = System.currentTimeMillis(); - running = true; - } - - if (inPlaceEligible) { - printStatusInPlace(progressMap, startTime, false, dagClient); - // log the progress report to log file as well - lastReport = logStatus(progressMap, lastReport, console); - } else { - lastReport = printStatus(progressMap, lastReport, console); - } - break; - case SUCCEEDED: - if (!running) { - startTime = monitorStartTime; - } - if (inPlaceEligible) { - printStatusInPlace(progressMap, startTime, false, dagClient); - // log the progress report to log file as well - lastReport = logStatus(progressMap, lastReport, console); - } else { - lastReport = printStatus(progressMap, lastReport, console); - } - success = true; - running = false; - done = true; - break; - case KILLED: - if (!running) { - startTime = monitorStartTime; - } - if (inPlaceEligible) { - printStatusInPlace(progressMap, startTime, true, dagClient); - // log the progress report to log file as well - lastReport = logStatus(progressMap, lastReport, console); - } - console.printInfo("Status: Killed"); - running = false; - done = true; - rc = 1; - break; - case FAILED: - case ERROR: - if (!running) { - startTime = monitorStartTime; - } - if (inPlaceEligible) { - printStatusInPlace(progressMap, startTime, true, dagClient); - // log the progress report to log file as well - lastReport = logStatus(progressMap, lastReport, console); - } - console.printError("Status: Failed"); - running = false; - done = true; - rc = 2; - break; - } - } - } catch (Exception e) { - console.printInfo("Exception: " + e.getMessage()); - boolean isInterrupted = hasInterruptedException(e); - if (isInterrupted || (++failedCounter % maxRetryInterval / checkInterval == 0)) { - try { - console.printInfo("Killing DAG..."); - dagClient.tryKillDAG(); - } catch (IOException io) { - // best effort - } catch (TezException te) { - // best effort - } - e.printStackTrace(); - console.printError("Execution has failed."); - rc = 1; - done = true; - } else { - console.printInfo("Retrying..."); - } - } finally { - if (done) { - if (rc != 0 && status != null) { - for (String diag : status.getDiagnostics()) { - console.printError(diag); - diagnostics.append(diag); - } - } - synchronized(shutdownList) { - shutdownList.remove(dagClient); - } - break; - } - } - } - - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); - - if (isProfileEnabled && success && progressMap != null) { - - double duration = (System.currentTimeMillis() - startTime) / 1000.0; - console.printInfo("Status: DAG finished successfully in " - + String.format("%.2f seconds", duration)); - console.printInfo(""); - - console.printInfo(QUERY_EXEC_SUMMARY_HEADER); - printQueryExecutionBreakDown(); - console.printInfo(SEPARATOR); - console.printInfo(""); - - console.printInfo(TASK_SUMMARY_HEADER); - printDagSummary(progressMap, console, dagClient, conf, dag, inPlaceEligible); - if (inPlaceEligible) { - console.printInfo(SEPARATOR); - } else { - console.printInfo(FILE_HEADER_SEPARATOR); - } - - if (llapIoEnabled) { - console.printInfo(""); - console.printInfo(LLAP_IO_SUMMARY_HEADER); - printLlapIOSummary(progressMap, console, dagClient); - console.printInfo(SEPARATOR); - console.printInfo(""); - - console.printInfo(FS_COUNTERS_SUMMARY_HEADER); - printFSCountersSummary(progressMap, console, dagClient); - } - - console.printInfo(""); - } - - return rc; - } - - private static boolean hasInterruptedException(Throwable e) { - // Hadoop IPC wraps InterruptedException. GRRR. - while (e != null) { - if (e instanceof InterruptedException || e instanceof InterruptedIOException) { - return true; - } - e = e.getCause(); - } - return false; - } - - /** - * killRunningJobs tries to terminate execution of all - * currently running tez queries. No guarantees, best effort only. - */ - public static void killRunningJobs() { - synchronized (shutdownList) { - for (DAGClient c : shutdownList) { - try { - System.err.println("Trying to shutdown DAG"); - c.tryKillDAG(); - } catch (Exception e) { - // ignore - } - } - } - } - - private static long getCounterValueByGroupName(TezCounters vertexCounters, - String groupNamePattern, - String counterName) { - TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName); - return (tezCounter == null) ? 0 : tezCounter.getValue(); - } - - private void printQueryExecutionBreakDown() { - - /* Build the method summary header */ - String execBreakdownHeader = String.format(OPERATION_SUMMARY, OPERATION, DURATION); - console.printInfo(SEPARATOR); - reprintLineWithColorAsBold(execBreakdownHeader, Ansi.Color.CYAN); - console.printInfo(SEPARATOR); - - // parse, analyze, optimize and compile - long compile = perfLogger.getEndTime(PerfLogger.COMPILE) - - perfLogger.getStartTime(PerfLogger.COMPILE); - console.printInfo(String.format(OPERATION_SUMMARY, "Compile Query", - secondsFormat.format(compile / 1000.0) + "s")); - - // prepare plan for submission (building DAG, adding resources, creating scratch dirs etc.) - long totalDAGPrep = perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG) - - perfLogger.getEndTime(PerfLogger.COMPILE); - console.printInfo(String.format(OPERATION_SUMMARY, "Prepare Plan", - secondsFormat.format(totalDAGPrep / 1000.0) + "s")); - - // submit to accept dag (if session is closed, this will include re-opening of session time, - // localizing files for AM, submitting DAG) - long submitToAccept = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) - - perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG); - console.printInfo(String.format(OPERATION_SUMMARY, "Submit Plan", - secondsFormat.format(submitToAccept / 1000.0) + "s")); - - // accept to start dag (schedule wait time, resource wait time etc.) - long acceptToStart = perfLogger.getDuration(PerfLogger.TEZ_SUBMIT_TO_RUNNING); - console.printInfo(String.format(OPERATION_SUMMARY, "Start DAG", - secondsFormat.format(acceptToStart / 1000.0) + "s")); - - // time to actually run the dag (actual dag runtime) - final long startToEnd; - if (acceptToStart == 0) { - startToEnd = perfLogger.getDuration(PerfLogger.TEZ_RUN_DAG); - } else { - startToEnd = perfLogger.getEndTime(PerfLogger.TEZ_RUN_DAG) - - perfLogger.getEndTime(PerfLogger.TEZ_SUBMIT_TO_RUNNING); - } - console.printInfo(String.format(OPERATION_SUMMARY, "Run DAG", - secondsFormat.format(startToEnd / 1000.0) + "s")); - - } - - private void printDagSummary(Map<String, Progress> progressMap, LogHelper console, - DAGClient dagClient, HiveConf conf, DAG dag, final boolean inPlaceEligible) { - - /* Strings for headers and counters */ - String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); - Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); - TezCounters hiveCounters = null; - try { - hiveCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters(); - } catch (IOException e) { - // best attempt, shouldn't really kill DAG for this - } catch (TezException e) { - // best attempt, shouldn't really kill DAG for this - } - - /* If the counters are missing there is no point trying to print progress */ - if (hiveCounters == null) { - return; - } - - /* Print the per Vertex summary */ - if (inPlaceEligible) { - console.printInfo(SEPARATOR); - reprintLineWithColorAsBold(SUMMARY_HEADER, Ansi.Color.CYAN); - console.printInfo(SEPARATOR); - } else { - console.printInfo(FILE_HEADER_SEPARATOR); - reprintLineWithColorAsBold(FILE_HEADER, Ansi.Color.CYAN); - console.printInfo(FILE_HEADER_SEPARATOR); - } - SortedSet<String> keys = new TreeSet<String>(progressMap.keySet()); - Set<StatusGetOpts> statusOptions = new HashSet<StatusGetOpts>(1); - statusOptions.add(StatusGetOpts.GET_COUNTERS); - for (String vertexName : keys) { - Progress progress = progressMap.get(vertexName); - if (progress != null) { - final int totalTasks = progress.getTotalTaskCount(); - final int failedTaskAttempts = progress.getFailedTaskAttemptCount(); - final int killedTaskAttempts = progress.getKilledTaskAttemptCount(); - final double duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName); - VertexStatus vertexStatus = null; - try { - vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions); - } catch (IOException e) { - // best attempt, shouldn't really kill DAG for this - } catch (TezException e) { - // best attempt, shouldn't really kill DAG for this - } - - if (vertexStatus == null) { - continue; - } - - Vertex currentVertex = dag.getVertex(vertexName); - List<Vertex> inputVerticesList = currentVertex.getInputVertices(); - long hiveInputRecordsFromOtherVertices = 0; - if (inputVerticesList.size() > 0) { - - for (Vertex inputVertex : inputVerticesList) { - String inputVertexName = inputVertex.getName(); - hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters, - hiveCountersGroup, String.format("%s_", - ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) + - inputVertexName.replace(" ", "_")); - - hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters, - hiveCountersGroup, String.format("%s_", - FileSinkOperator.Counter.RECORDS_OUT.toString()) + - inputVertexName.replace(" ", "_")); - } - } - - /* - * Get the CPU & GC - * - * counters org.apache.tez.common.counters.TaskCounter - * GC_TIME_MILLIS=37712 - * CPU_MILLISECONDS=2774230 - */ - final TezCounters vertexCounters = vertexStatus.getVertexCounters(); - final double cpuTimeMillis = getCounterValueByGroupName(vertexCounters, - TaskCounter.class.getName(), - TaskCounter.CPU_MILLISECONDS.name()); - - final double gcTimeMillis = getCounterValueByGroupName(vertexCounters, - TaskCounter.class.getName(), - TaskCounter.GC_TIME_MILLIS.name()); - - /* - * Get the HIVE counters - * - * HIVE - * CREATED_FILES=1 - * DESERIALIZE_ERRORS=0 - * RECORDS_IN_Map_1=550076554 - * RECORDS_OUT_INTERMEDIATE_Map_1=854987 - * RECORDS_OUT_Reducer_2=1 - */ - - final long hiveInputRecords = - getCounterValueByGroupName( - hiveCounters, - hiveCountersGroup, - String.format("%s_", MapOperator.Counter.RECORDS_IN.toString()) - + vertexName.replace(" ", "_")) - + hiveInputRecordsFromOtherVertices; - final long hiveOutputIntermediateRecords = - getCounterValueByGroupName( - hiveCounters, - hiveCountersGroup, - String.format("%s_", ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) - + vertexName.replace(" ", "_")); - final long hiveOutputRecords = - getCounterValueByGroupName( - hiveCounters, - hiveCountersGroup, - String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString()) - + vertexName.replace(" ", "_")) - + hiveOutputIntermediateRecords; - - final String vertexExecutionStats; - if (inPlaceEligible) { - vertexExecutionStats = String.format(SUMMARY_HEADER_FORMAT, - vertexName, - secondsFormat.format((duration)), - commaFormat.format(cpuTimeMillis), - commaFormat.format(gcTimeMillis), - commaFormat.format(hiveInputRecords), - commaFormat.format(hiveOutputRecords)); - } else { - vertexExecutionStats = String.format(FILE_HEADER_FORMAT, - vertexName, - totalTasks, - failedTaskAttempts, - killedTaskAttempts, - secondsFormat.format((duration)), - commaFormat.format(cpuTimeMillis), - commaFormat.format(gcTimeMillis), - commaFormat.format(hiveInputRecords), - commaFormat.format(hiveOutputRecords)); - } - console.printInfo(vertexExecutionStats); - } - } - } - - private void printLlapIOSummary(Map<String, Progress> progressMap, LogHelper console, - DAGClient dagClient) { - SortedSet<String> keys = new TreeSet<>(progressMap.keySet()); - Set<StatusGetOpts> statusOptions = new HashSet<>(1); - statusOptions.add(StatusGetOpts.GET_COUNTERS); - boolean first = false; - String counterGroup = LlapIOCounters.class.getName(); - for (String vertexName : keys) { - // Reducers do not benefit from LLAP IO so no point in printing - if (vertexName.startsWith("Reducer")) { - continue; - } - TezCounters vertexCounters = null; - try { - vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions) - .getVertexCounters(); - } catch (IOException e) { - // best attempt, shouldn't really kill DAG for this - } catch (TezException e) { - // best attempt, shouldn't really kill DAG for this - } - if (vertexCounters != null) { - final long selectedRowgroups = getCounterValueByGroupName(vertexCounters, - counterGroup, LlapIOCounters.SELECTED_ROWGROUPS.name()); - final long metadataCacheHit = getCounterValueByGroupName(vertexCounters, - counterGroup, LlapIOCounters.METADATA_CACHE_HIT.name()); - final long metadataCacheMiss = getCounterValueByGroupName(vertexCounters, - counterGroup, LlapIOCounters.METADATA_CACHE_MISS.name()); - final long cacheHitBytes = getCounterValueByGroupName(vertexCounters, - counterGroup, LlapIOCounters.CACHE_HIT_BYTES.name()); - final long cacheMissBytes = getCounterValueByGroupName(vertexCounters, - counterGroup, LlapIOCounters.CACHE_MISS_BYTES.name()); - final long allocatedBytes = getCounterValueByGroupName(vertexCounters, - counterGroup, LlapIOCounters.ALLOCATED_BYTES.name()); - final long allocatedUsedBytes = getCounterValueByGroupName(vertexCounters, - counterGroup, LlapIOCounters.ALLOCATED_USED_BYTES.name()); - final long totalIoTime = getCounterValueByGroupName(vertexCounters, - counterGroup, LlapIOCounters.TOTAL_IO_TIME_NS.name()); - - if (!first) { - console.printInfo(SEPARATOR); - reprintLineWithColorAsBold(LLAP_SUMMARY_HEADER, Ansi.Color.CYAN); - console.printInfo(SEPARATOR); - first = true; - } - - String queryFragmentStats = String.format(LLAP_SUMMARY_HEADER_FORMAT, - vertexName, - selectedRowgroups, - metadataCacheHit, - metadataCacheMiss, - Utilities.humanReadableByteCount(cacheHitBytes), - Utilities.humanReadableByteCount(cacheMissBytes), - Utilities.humanReadableByteCount(allocatedBytes), - Utilities.humanReadableByteCount(allocatedUsedBytes), - secondsFormat.format(totalIoTime / 1000_000_000.0) + "s"); - console.printInfo(queryFragmentStats); - } - } - } - - private void printFSCountersSummary(Map<String, Progress> progressMap, LogHelper console, - DAGClient dagClient) { - SortedSet<String> keys = new TreeSet<>(progressMap.keySet()); - Set<StatusGetOpts> statusOptions = new HashSet<>(1); - statusOptions.add(StatusGetOpts.GET_COUNTERS); - // Assuming FileSystem.getAllStatistics() returns all schemes that are accessed on task side - // as well. If not, we need a way to get all the schemes that are accessed by the tez task/llap. - for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { - final String scheme = statistics.getScheme().toUpperCase(); - final String fsCountersHeader = String.format(FS_COUNTERS_HEADER_FORMAT, - "VERTICES", "BYTES_READ", "READ_OPS", "LARGE_READ_OPS", "BYTES_WRITTEN", "WRITE_OPS"); - - console.printInfo(""); - reprintLineWithColorAsBold("Scheme: " + scheme, Ansi.Color.RED); - console.printInfo(SEPARATOR); - reprintLineWithColorAsBold(fsCountersHeader, Ansi.Color.CYAN); - console.printInfo(SEPARATOR); - - for (String vertexName : keys) { - TezCounters vertexCounters = null; - try { - vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions) - .getVertexCounters(); - } catch (IOException e) { - // best attempt, shouldn't really kill DAG for this - } catch (TezException e) { - // best attempt, shouldn't really kill DAG for this - } - if (vertexCounters != null) { - final String counterGroup = FileSystemCounter.class.getName(); - final long bytesRead = getCounterValueByGroupName(vertexCounters, - counterGroup, scheme + "_" + FileSystemCounter.BYTES_READ.name()); - final long bytesWritten = getCounterValueByGroupName(vertexCounters, - counterGroup, scheme + "_" + FileSystemCounter.BYTES_WRITTEN.name()); - final long readOps = getCounterValueByGroupName(vertexCounters, - counterGroup, scheme + "_" + FileSystemCounter.READ_OPS.name()); - final long largeReadOps = getCounterValueByGroupName(vertexCounters, - counterGroup, scheme + "_" + FileSystemCounter.LARGE_READ_OPS.name()); - final long writeOps = getCounterValueByGroupName(vertexCounters, - counterGroup, scheme + "_" + FileSystemCounter.WRITE_OPS.name()); - - String fsCountersSummary = String.format(FS_COUNTERS_HEADER_FORMAT, - vertexName, - Utilities.humanReadableByteCount(bytesRead), - readOps, - largeReadOps, - Utilities.humanReadableByteCount(bytesWritten), - writeOps); - console.printInfo(fsCountersSummary); - } - } - - console.printInfo(SEPARATOR); - } - } - - private void printStatusInPlace(Map<String, Progress> progressMap, long startTime, - boolean vextexStatusFromAM, DAGClient dagClient) { - StringBuilder reportBuffer = new StringBuilder(); - int sumComplete = 0; - int sumTotal = 0; - - // position the cursor to line 0 - repositionCursor(); - - // print header - // ------------------------------------------------------------------------------- - // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED - // ------------------------------------------------------------------------------- - reprintLine(SEPARATOR); - reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN); - reprintLine(SEPARATOR); - - SortedSet<String> keys = new TreeSet<String>(progressMap.keySet()); - int idx = 0; - int maxKeys = keys.size(); - for (String s : keys) { - idx++; - Progress progress = progressMap.get(s); - final int complete = progress.getSucceededTaskCount(); - final int total = progress.getTotalTaskCount(); - final int running = progress.getRunningTaskCount(); - final int failed = progress.getFailedTaskAttemptCount(); - final int pending = progress.getTotalTaskCount() - progress.getSucceededTaskCount() - - progress.getRunningTaskCount(); - final int killed = progress.getKilledTaskAttemptCount(); - - // To get vertex status we can use DAGClient.getVertexStatus(), but it will be expensive to - // get status from AM for every refresh of the UI. Lets infer the state from task counts. - // Only if DAG is FAILED or KILLED the vertex status is fetched from AM. - VertexStatus.State vertexState = VertexStatus.State.INITIALIZING; - - // INITED state - if (total > 0) { - vertexState = VertexStatus.State.INITED; - sumComplete += complete; - sumTotal += total; - } - - // RUNNING state - if (complete < total && (complete > 0 || running > 0 || failed > 0)) { - vertexState = VertexStatus.State.RUNNING; - if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } - } - - // SUCCEEDED state - if (complete == total) { - vertexState = VertexStatus.State.SUCCEEDED; - if (!completed.contains(s)) { - completed.add(s); - - /* We may have missed the start of the vertex - * due to the 3 seconds interval - */ - if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } - - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } - } - - // DAG might have been killed, lets try to get vertex state from AM before dying - // KILLED or FAILED state - if (vextexStatusFromAM) { - VertexStatus vertexStatus = null; - try { - vertexStatus = dagClient.getVertexStatus(s, null); - } catch (IOException e) { - // best attempt, shouldn't really kill DAG for this - } catch (TezException e) { - // best attempt, shouldn't really kill DAG for this - } - if (vertexStatus != null) { - vertexState = vertexStatus.getState(); - } - } - - // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0 - String nameWithProgress = getNameWithProgress(s, complete, total); - String mode = getMode(s, workMap); - String vertexStr = String.format(VERTEX_FORMAT, - nameWithProgress, - mode, - vertexState.toString(), - total, - complete, - running, - pending, - failed, - killed); - reportBuffer.append(vertexStr); - if (idx != maxKeys) { - reportBuffer.append("\n"); - } - } - - reprintMultiLine(reportBuffer.toString()); - - // ------------------------------------------------------------------------------- - // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s - // ------------------------------------------------------------------------------- - reprintLine(SEPARATOR); - final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal; - String footer = getFooter(keys.size(), completed.size(), progress, startTime); - reprintLineWithColorAsBold(footer, Ansi.Color.RED); - reprintLine(SEPARATOR); - } - - private String getMode(String name, Map<String, BaseWork> workMap) { - String mode = "container"; - BaseWork work = workMap.get(name); - if (work != null) { - // uber > llap > container - if (work.getUberMode()) { - mode = "uber"; - } else if (work.getLlapMode()) { - mode = "llap"; - } else { - mode = "container"; - } - } - return mode; - } - - // Map 1 .......... - private String getNameWithProgress(String s, int complete, int total) { - String result = ""; - if (s != null) { - float percent = total == 0 ? 0.0f : (float) complete / (float) total; - // lets use the remaining space in column 1 as progress bar - int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1; - String trimmedVName = s; - - // if the vertex name is longer than column 1 width, trim it down - // "Tez Merge File Work" will become "Tez Merge File.." - if (s.length() > COLUMN_1_WIDTH) { - trimmedVName = s.substring(0, COLUMN_1_WIDTH - 1); - trimmedVName = trimmedVName + ".."; - } - - result = trimmedVName + " "; - int toFill = (int) (spaceRemaining * percent); - for (int i = 0; i < toFill; i++) { - result += "."; - } - } - return result; - } - - // VERTICES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s - private String getFooter(int keySize, int completedSize, float progress, long startTime) { - String verticesSummary = String.format("VERTICES: %02d/%02d", completedSize, keySize); - String progressBar = getInPlaceProgressBar(progress); - final int progressPercent = (int) (progress * 100); - String progressStr = "" + progressPercent + "%"; - float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000; - String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s"; - String footer = String.format(FOOTER_FORMAT, - verticesSummary, progressBar, progressStr, elapsedTime); - return footer; - } - - // [==================>>-----] - private String getInPlaceProgressBar(float percent) { - StringBuilder bar = new StringBuilder("["); - int remainingChars = progressBarChars - 4; - int completed = (int) (remainingChars * percent); - int pending = remainingChars - completed; - for (int i = 0; i < completed; i++) { - bar.append("="); - } - bar.append(">>"); - for (int i = 0; i < pending; i++) { - bar.append("-"); - } - bar.append("]"); - return bar.toString(); - } - - private String printStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) { - String report = getReport(progressMap); - if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) { - console.printInfo(report); - lastPrintTime = System.currentTimeMillis(); - } - return report; - } - - private String logStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) { - String report = getReport(progressMap); - if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) { - console.logInfo(report); - lastPrintTime = System.currentTimeMillis(); - } - return report; - } - - private String getReport(Map<String, Progress> progressMap) { - StringBuilder reportBuffer = new StringBuilder(); - - SortedSet<String> keys = new TreeSet<String>(progressMap.keySet()); - for (String s: keys) { - Progress progress = progressMap.get(s); - final int complete = progress.getSucceededTaskCount(); - final int total = progress.getTotalTaskCount(); - final int running = progress.getRunningTaskCount(); - final int failed = progress.getFailedTaskAttemptCount(); - if (total <= 0) { - reportBuffer.append(String.format("%s: -/-\t", s)); - } else { - if (complete == total && !completed.contains(s)) { - completed.add(s); - - /* - * We may have missed the start of the vertex due to the 3 seconds interval - */ - if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } - - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } - if(complete < total && (complete > 0 || running > 0 || failed > 0)) { - - if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } - - /* vertex is started, but not complete */ - if (failed > 0) { - reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total)); - } else { - reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total)); - } - } else { - /* vertex is waiting for input/slots or complete */ - if (failed > 0) { - /* tasks finished but some failed */ - reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total)); - } else { - reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total)); - } - } - } - } - - return reportBuffer.toString(); - } - - public String getDiagnostics() { - return diagnostics.toString(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index f1071fa..62f65c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -17,9 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.tez; - import java.util.Collection; - import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -40,9 +38,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; - import javax.security.auth.login.LoginException; - import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; @@ -83,6 +79,7 @@ import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; /** * Holds session state related to Tez @@ -671,7 +668,7 @@ public class TezSessionState { } public List<LocalResource> getLocalizedResources() { - return new ArrayList<LocalResource>(localizedResources); + return new ArrayList<>(localizedResources); } public String getUser() { @@ -698,4 +695,5 @@ public class TezSessionState { } } while (!ownerThread.compareAndSet(null, newName)); } + } http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 7479b85..69cbe0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -79,6 +79,7 @@ import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; import org.json.JSONObject; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; /** * @@ -178,8 +179,9 @@ public class TezTask extends Task<TezWork> { additionalLr, inputOutputJars, inputOutputLocalResources); // finally monitor will print progress until the job is done - TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap()); - rc = monitor.monitorExecution(dagClient, conf, dag, ctx); + TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap(),dagClient, conf, dag, ctx); + rc = monitor.monitorExecution(); + if (rc != 0) { this.setException(new HiveException(monitor.getDiagnostics())); } http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java new file mode 100644 index 0000000..eccbbb6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java @@ -0,0 +1,7 @@ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.common.log.InPlaceUpdate; + +public interface Constants { + String SEPARATOR = new String(new char[InPlaceUpdate.MIN_TERMINAL_WIDTH]).replace("\0", "-"); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java new file mode 100644 index 0000000..5840ad6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java @@ -0,0 +1,197 @@ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.MapOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.Progress; +import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.client.VertexStatus; + +import java.io.IOException; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.*; + + +class DAGSummary implements PrintSummary { + + private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdate.MIN_TERMINAL_WIDTH + 34; + private static final String FILE_HEADER_SEPARATOR = new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-"); + + private static final String FORMATTING_PATTERN = "%10s %12s %16s %13s %14s %13s %12s %14s %15s"; + private static final String FILE_HEADER = String.format( + FORMATTING_PATTERN, + "VERTICES", + "TOTAL_TASKS", + "FAILED_ATTEMPTS", + "KILLED_TASKS", + "DURATION(ms)", + "CPU_TIME(ms)", + "GC_TIME(ms)", + "INPUT_RECORDS", + "OUTPUT_RECORDS" + ); + + private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00"); + private final NumberFormat commaFormatter = NumberFormat.getNumberInstance(Locale.US); + + private final String hiveCountersGroup; + private final TezCounters hiveCounters; + + private Map<String, Progress> progressMap; + private DAGClient dagClient; + private DAG dag; + private PerfLogger perfLogger; + + DAGSummary(Map<String, Progress> progressMap, HiveConf hiveConf, DAGClient dagClient, + DAG dag, PerfLogger perfLogger) { + this.progressMap = progressMap; + this.dagClient = dagClient; + this.dag = dag; + this.perfLogger = perfLogger; + this.hiveCountersGroup = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP); + this.hiveCounters = hiveCounters(dagClient); + } + + private long hiveInputRecordsFromOtherVertices(String vertexName) { + List<Vertex> inputVerticesList = dag.getVertex(vertexName).getInputVertices(); + long result = 0; + for (Vertex inputVertex : inputVerticesList) { + String intermediateRecordsCounterName = formattedName( + ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(), + inputVertex.getName() + ); + String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(), + inputVertex.getName()); + result += ( + hiveCounterValue(intermediateRecordsCounterName) + + hiveCounterValue(recordsOutCounterName) + ); + } + return result; + } + + private String formattedName(String counterName, String vertexName) { + return String.format("%s_", counterName) + vertexName.replace(" ", "_"); + } + + private long getCounterValueByGroupName(TezCounters counters, String pattern, String counterName) { + TezCounter tezCounter = counters.getGroup(pattern).findCounter(counterName); + return (tezCounter == null) ? 0 : tezCounter.getValue(); + } + + private long hiveCounterValue(String counterName) { + return getCounterValueByGroupName(hiveCounters, hiveCountersGroup, counterName); + } + + private TezCounters hiveCounters(DAGClient dagClient) { + try { + return dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS)).getDAGCounters(); + } catch (IOException | TezException e) { + // best attempt, shouldn't really kill DAG for this + } + return null; + } + + @Override + public void print(SessionState.LogHelper console) { + console.printInfo("Task Execution Summary"); + + /* If the counters are missing there is no point trying to print progress */ + if (hiveCounters == null) { + return; + } + + /* Print the per Vertex summary */ + printHeader(console); + SortedSet<String> keys = new TreeSet<>(progressMap.keySet()); + Set<StatusGetOpts> statusOptions = new HashSet<>(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + for (String vertexName : keys) { + Progress progress = progressMap.get(vertexName); + if (progress == null) continue; + + VertexStatus vertexStatus = vertexStatus(statusOptions, vertexName); + if (vertexStatus == null) { + continue; + } + console.printInfo(vertexSummary(vertexName, progress, vertexStatus)); + } + console.printInfo(FILE_HEADER_SEPARATOR); + } + + private String vertexSummary(String vertexName, Progress progress, VertexStatus vertexStatus) { + /* + * Get the CPU & GC + * + * counters org.apache.tez.common.counters.TaskCounter + * GC_TIME_MILLIS=37712 + * CPU_MILLISECONDS=2774230 + */ + final TezCounters vertexCounters = vertexStatus.getVertexCounters(); + final double cpuTimeMillis = getCounterValueByGroupName(vertexCounters, + TaskCounter.class.getName(), + TaskCounter.CPU_MILLISECONDS.name()); + + final double gcTimeMillis = getCounterValueByGroupName(vertexCounters, + TaskCounter.class.getName(), + TaskCounter.GC_TIME_MILLIS.name()); + + /* + * Get the HIVE counters + * + * HIVE + * CREATED_FILES=1 + * DESERIALIZE_ERRORS=0 + * RECORDS_IN_Map_1=550076554 + * RECORDS_OUT_INTERMEDIATE_Map_1=854987 + * RECORDS_OUT_Reducer_2=1 + */ + final long hiveInputRecords = + hiveCounterValue(formattedName(MapOperator.Counter.RECORDS_IN.toString(), vertexName)) + + hiveInputRecordsFromOtherVertices(vertexName); + + final long hiveOutputRecords = + hiveCounterValue(formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(), vertexName)) + + hiveCounterValue(formattedName(ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(), vertexName)); + + final double duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName); + + return String.format(FORMATTING_PATTERN, + vertexName, + progress.getTotalTaskCount(), + progress.getFailedTaskAttemptCount(), + progress.getKilledTaskAttemptCount(), + secondsFormatter.format((duration)), + commaFormatter.format(cpuTimeMillis), + commaFormatter.format(gcTimeMillis), + commaFormatter.format(hiveInputRecords), + commaFormatter.format(hiveOutputRecords)); + } + + private VertexStatus vertexStatus(Set<StatusGetOpts> statusOptions, String vertexName) { + try { + return dagClient.getVertexStatus(vertexName, statusOptions); + } catch (IOException | TezException e) { + // best attempt, shouldn't really kill DAG for this + } + return null; + } + + private void printHeader(SessionState.LogHelper console) { + console.printInfo(FILE_HEADER_SEPARATOR); + console.printInfo(FILE_HEADER); + console.printInfo(FILE_HEADER_SEPARATOR); + } +}
