http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java new file mode 100644 index 0000000..0a28edd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java @@ -0,0 +1,92 @@ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.Progress; +import org.apache.tez.dag.api.client.StatusGetOpts; + +import java.io.IOException; +import java.util.*; + +import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR; +import static org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor.getCounterValueByGroupName; + +public class FSCountersSummary implements PrintSummary { + + private static final String FORMATTING_PATTERN = "%10s %15s %13s %18s %18s %13s"; + private static final String HEADER = String.format(FORMATTING_PATTERN, + "VERTICES", "BYTES_READ", "READ_OPS", "LARGE_READ_OPS", "BYTES_WRITTEN", "WRITE_OPS"); + + private Map<String, Progress> progressMap; + private DAGClient dagClient; + + FSCountersSummary(Map<String, Progress> progressMap, DAGClient dagClient) { + this.progressMap = progressMap; + this.dagClient = dagClient; + } + + @Override + public void print(SessionState.LogHelper console) { + console.printInfo("FileSystem Counters Summary"); + + SortedSet<String> keys = new TreeSet<>(progressMap.keySet()); + Set<StatusGetOpts> statusOptions = new HashSet<>(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + // Assuming FileSystem.getAllStatistics() returns all schemes that are accessed on task side + // as well. If not, we need a way to get all the schemes that are accessed by the tez task/llap. + for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { + final String scheme = statistics.getScheme().toUpperCase(); + + console.printInfo(""); + console.printInfo("Scheme: " + scheme); + console.printInfo(SEPARATOR); + console.printInfo(HEADER); + console.printInfo(SEPARATOR); + + for (String vertexName : keys) { + TezCounters vertexCounters = vertexCounters(statusOptions, vertexName); + if (vertexCounters != null) { + console.printInfo(summary(scheme, vertexName, vertexCounters)); + } + } + + console.printInfo(SEPARATOR); + } + } + + private String summary(String scheme, String vertexName, TezCounters vertexCounters) { + final String counterGroup = FileSystemCounter.class.getName(); + final long bytesRead = getCounterValueByGroupName(vertexCounters, + counterGroup, scheme + "_" + FileSystemCounter.BYTES_READ.name()); + final long bytesWritten = getCounterValueByGroupName(vertexCounters, + counterGroup, scheme + "_" + FileSystemCounter.BYTES_WRITTEN.name()); + final long readOps = getCounterValueByGroupName(vertexCounters, + counterGroup, scheme + "_" + FileSystemCounter.READ_OPS.name()); + final long largeReadOps = getCounterValueByGroupName(vertexCounters, + counterGroup, scheme + "_" + FileSystemCounter.LARGE_READ_OPS.name()); + final long writeOps = getCounterValueByGroupName(vertexCounters, + counterGroup, scheme + "_" + FileSystemCounter.WRITE_OPS.name()); + + return String.format(FORMATTING_PATTERN, + vertexName, + Utilities.humanReadableByteCount(bytesRead), + readOps, + largeReadOps, + Utilities.humanReadableByteCount(bytesWritten), + writeOps); + } + + private TezCounters vertexCounters(Set<StatusGetOpts> statusOptions, String vertexName) { + try { + return dagClient.getVertexStatus(vertexName, statusOptions).getVertexCounters(); + } catch (IOException | TezException e) { + // best attempt, shouldn't really kill DAG for this + } + return null; + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java new file mode 100644 index 0000000..81f1755 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java @@ -0,0 +1,108 @@ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.Progress; +import org.apache.tez.dag.api.client.StatusGetOpts; + +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.*; + +import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR; +import static org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor.getCounterValueByGroupName; + +public class LLAPioSummary implements PrintSummary { + + private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %9s %9s %10s %9s %10s %11s %8s %9s"; + private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary"; + private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT, + "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS", + "ALLOCATION", "USED", "TOTAL_IO"); + + + + private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00"); + private Map<String, Progress> progressMap; + private DAGClient dagClient; + private boolean first = false; + + LLAPioSummary(Map<String, Progress> progressMap, DAGClient dagClient) { + this.progressMap = progressMap; + this.dagClient = dagClient; + } + + @Override + public void print(SessionState.LogHelper console) { + console.printInfo(""); + console.printInfo(LLAP_IO_SUMMARY_HEADER); + + SortedSet<String> keys = new TreeSet<>(progressMap.keySet()); + Set<StatusGetOpts> statusOptions = new HashSet<>(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + String counterGroup = LlapIOCounters.class.getName(); + for (String vertexName : keys) { + // Reducers do not benefit from LLAP IO so no point in printing + if (vertexName.startsWith("Reducer")) { + continue; + } + TezCounters vertexCounters = vertexCounter(statusOptions, vertexName); + if (vertexCounters != null) { + if (!first) { + console.printInfo(SEPARATOR); + console.printInfo(LLAP_SUMMARY_HEADER); + console.printInfo(SEPARATOR); + first = true; + } + console.printInfo(vertexSummary(vertexName, counterGroup, vertexCounters)); + } + } + console.printInfo(SEPARATOR); + console.printInfo(""); + } + + private String vertexSummary(String vertexName, String counterGroup, TezCounters vertexCounters) { + final long selectedRowgroups = getCounterValueByGroupName(vertexCounters, + counterGroup, LlapIOCounters.SELECTED_ROWGROUPS.name()); + final long metadataCacheHit = getCounterValueByGroupName(vertexCounters, + counterGroup, LlapIOCounters.METADATA_CACHE_HIT.name()); + final long metadataCacheMiss = getCounterValueByGroupName(vertexCounters, + counterGroup, LlapIOCounters.METADATA_CACHE_MISS.name()); + final long cacheHitBytes = getCounterValueByGroupName(vertexCounters, + counterGroup, LlapIOCounters.CACHE_HIT_BYTES.name()); + final long cacheMissBytes = getCounterValueByGroupName(vertexCounters, + counterGroup, LlapIOCounters.CACHE_MISS_BYTES.name()); + final long allocatedBytes = getCounterValueByGroupName(vertexCounters, + counterGroup, LlapIOCounters.ALLOCATED_BYTES.name()); + final long allocatedUsedBytes = getCounterValueByGroupName(vertexCounters, + counterGroup, LlapIOCounters.ALLOCATED_USED_BYTES.name()); + final long totalIoTime = getCounterValueByGroupName(vertexCounters, + counterGroup, LlapIOCounters.TOTAL_IO_TIME_NS.name()); + + + return String.format(LLAP_SUMMARY_HEADER_FORMAT, + vertexName, + selectedRowgroups, + metadataCacheHit, + metadataCacheMiss, + Utilities.humanReadableByteCount(cacheHitBytes), + Utilities.humanReadableByteCount(cacheMissBytes), + Utilities.humanReadableByteCount(allocatedBytes), + Utilities.humanReadableByteCount(allocatedUsedBytes), + secondsFormatter.format(totalIoTime / 1000_000_000.0) + "s"); + } + + private TezCounters vertexCounter(Set<StatusGetOpts> statusOptions, String vertexName) { + try { + return dagClient.getVertexStatus(vertexName, statusOptions).getVertexCounters(); + } catch (IOException | TezException e) { + // best attempt, shouldn't really kill DAG for this + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java new file mode 100644 index 0000000..6311335 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java @@ -0,0 +1,7 @@ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.ql.session.SessionState; + +interface PrintSummary { + void print(SessionState.LogHelper console); +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java new file mode 100644 index 0000000..1625ce1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java @@ -0,0 +1,75 @@ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.SessionState; + +import java.text.DecimalFormat; + +import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR; + +class QueryExecutionBreakdownSummary implements PrintSummary { + // Methods summary + private static final String OPERATION_SUMMARY = "%-35s %9s"; + private static final String OPERATION = "OPERATION"; + private static final String DURATION = "DURATION"; + + + private DecimalFormat decimalFormat = new DecimalFormat("#0.00"); + private PerfLogger perfLogger; + + private final Long compileEndTime; + private final Long dagSubmitStartTime; + private final Long submitToRunningDuration; + + QueryExecutionBreakdownSummary(PerfLogger perfLogger) { + this.perfLogger = perfLogger; + this.compileEndTime = perfLogger.getEndTime(PerfLogger.COMPILE); + this.dagSubmitStartTime = perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG); + this.submitToRunningDuration = perfLogger.getDuration(PerfLogger.TEZ_SUBMIT_TO_RUNNING); + } + + private String formatNumber(long number) { + return decimalFormat.format(number / 1000.0) + "s"; + } + + private String format(String value, long number) { + return String.format(OPERATION_SUMMARY, value, formatNumber(number)); + } + + public void print(SessionState.LogHelper console) { + console.printInfo("Query Execution Summary"); + + String execBreakdownHeader = String.format(OPERATION_SUMMARY, OPERATION, DURATION); + console.printInfo(SEPARATOR); + console.printInfo(execBreakdownHeader); + console.printInfo(SEPARATOR); + + // parse, analyze, optimize and compile + long compile = compileEndTime - perfLogger.getStartTime(PerfLogger.COMPILE); + console.printInfo(format("Compile Query", compile)); + + // prepare plan for submission (building DAG, adding resources, creating scratch dirs etc.) + long totalDAGPrep = dagSubmitStartTime - compileEndTime; + console.printInfo(format("Prepare Plan", totalDAGPrep)); + + // submit to accept dag (if session is closed, this will include re-opening of session time, + // localizing files for AM, submitting DAG) + long submitToAccept = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) - dagSubmitStartTime; + console.printInfo(format("Submit Plan", submitToAccept)); + + // accept to start dag (schedule wait time, resource wait time etc.) + console.printInfo(format("Start DAG", submitToRunningDuration)); + + // time to actually run the dag (actual dag runtime) + final long startToEnd; + if (submitToRunningDuration == 0) { + startToEnd = perfLogger.getDuration(PerfLogger.TEZ_RUN_DAG); + } else { + startToEnd = perfLogger.getEndTime(PerfLogger.TEZ_RUN_DAG) - + perfLogger.getEndTime(PerfLogger.TEZ_SUBMIT_TO_RUNNING); + } + console.printInfo(format("Run DAG", startToEnd)); + console.printInfo(SEPARATOR); + console.printInfo(""); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java new file mode 100644 index 0000000..1e54f6e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -0,0 +1,397 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + <p> + http://www.apache.org/licenses/LICENSE-2.0 + <p> + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.common.log.ProgressMonitor; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.Progress; +import org.apache.tez.dag.api.client.StatusGetOpts; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.StringWriter; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; + +/** + * TezJobMonitor keeps track of a tez job while it's being executed. It will + * print status to the console and retrieve final status of the job after + * completion. + */ +public class TezJobMonitor { + + private static final String CLASS_NAME = TezJobMonitor.class.getName(); + private static final int CHECK_INTERVAL = 200; + private static final int MAX_RETRY_INTERVAL = 2500; + private static final int PRINT_INTERVAL = 3000; + + private final PerfLogger perfLogger = SessionState.getPerfLogger(); + private static final List<DAGClient> shutdownList; + private final Map<String, BaseWork> workMap; + + private transient LogHelper console; + + private long lastPrintTime; + private StringWriter diagnostics = new StringWriter(); + + interface UpdateFunction { + void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report); + } + + static { + shutdownList = new LinkedList<>(); + ShutdownHookManager.addShutdownHook(new Runnable() { + @Override + public void run() { + TezJobMonitor.killRunningJobs(); + try { + TezSessionPoolManager.getInstance().closeNonDefaultSessions(false); + } catch (Exception e) { + // ignore + } + } + }); + } + + public static void initShutdownHook() { + Preconditions.checkNotNull(shutdownList, + "Shutdown hook was not properly initialized"); + } + + private final DAGClient dagClient; + private final HiveConf hiveConf; + private final DAG dag; + private final Context context; + private long executionStartTime = 0; + private final UpdateFunction updateFunction; + /** + * Have to use the same instance to render else the number lines printed earlier is lost and the + * screen will print the table again and again. + */ + private final InPlaceUpdate inPlaceUpdate; + + public TezJobMonitor(Map<String, BaseWork> workMap, final DAGClient dagClient, HiveConf conf, DAG dag, + Context ctx) { + this.workMap = workMap; + this.dagClient = dagClient; + this.hiveConf = conf; + this.dag = dag; + this.context = ctx; + console = SessionState.getConsole(); + inPlaceUpdate = new InPlaceUpdate(LogHelper.getInfoStream()); + updateFunction = updateFunction(); + } + + private UpdateFunction updateFunction() { + UpdateFunction logToFileFunction = new UpdateFunction() { + @Override + public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) { + SessionState.get().updateProgressMonitor(progressMonitor(status, vertexProgressMap)); + console.printInfo(report); + } + }; + UpdateFunction inPlaceUpdateFunction = new UpdateFunction() { + @Override + public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) { + inPlaceUpdate.render(progressMonitor(status, vertexProgressMap)); + console.logInfo(report); + } + }; + return InPlaceUpdate.canRenderInPlace(hiveConf) + && !SessionState.getConsole().getIsSilent() + && !SessionState.get().isHiveServerQuery() + ? inPlaceUpdateFunction : logToFileFunction; + } + + private boolean isProfilingEnabled() { + return HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || + Utilities.isPerfOrAboveLogging(hiveConf); + } + + public int monitorExecution() { + boolean done = false; + boolean success = false; + int failedCounter = 0; + int rc = 0; + DAGStatus status = null; + Map<String, Progress> vertexProgressMap = null; + + + long monitorStartTime = System.currentTimeMillis(); + synchronized (shutdownList) { + shutdownList.add(dagClient); + } + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); + DAGStatus.State lastState = null; + String lastReport = null; + boolean running = false; + + while (true) { + + try { + if (context != null) { + context.checkHeartbeaterLockException(); + } + + status = dagClient.getDAGStatus(new HashSet<StatusGetOpts>(), CHECK_INTERVAL); + vertexProgressMap = status.getVertexProgress(); + DAGStatus.State state = status.getState(); + + if (state != lastState || state == RUNNING) { + lastState = state; + + switch (state) { + case SUBMITTED: + console.printInfo("Status: Submitted"); + break; + case INITING: + console.printInfo("Status: Initializing"); + this.executionStartTime = System.currentTimeMillis(); + break; + case RUNNING: + if (!running) { + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); + console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n"); + this.executionStartTime = System.currentTimeMillis(); + running = true; + } + lastReport = updateStatus(status, vertexProgressMap, lastReport); + break; + case SUCCEEDED: + if (!running) { + this.executionStartTime = monitorStartTime; + } + lastReport = updateStatus(status, vertexProgressMap, lastReport); + success = true; + running = false; + done = true; + break; + case KILLED: + if (!running) { + this.executionStartTime = monitorStartTime; + } + lastReport = updateStatus(status, vertexProgressMap, lastReport); + console.printInfo("Status: Killed"); + running = false; + done = true; + rc = 1; + break; + case FAILED: + case ERROR: + if (!running) { + this.executionStartTime = monitorStartTime; + } + lastReport = updateStatus(status, vertexProgressMap, lastReport); + console.printError("Status: Failed"); + running = false; + done = true; + rc = 2; + break; + } + } + } catch (Exception e) { + console.printInfo("Exception: " + e.getMessage()); + boolean isInterrupted = hasInterruptedException(e); + if (isInterrupted || (++failedCounter % MAX_RETRY_INTERVAL / CHECK_INTERVAL == 0)) { + try { + console.printInfo("Killing DAG..."); + dagClient.tryKillDAG(); + } catch (IOException | TezException tezException) { + // best effort + } + console + .printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + rc = 1; + done = true; + } else { + console.printInfo("Retrying..."); + } + } finally { + if (done) { + if (rc != 0 && status != null) { + for (String diag : status.getDiagnostics()) { + console.printError(diag); + diagnostics.append(diag); + } + } + synchronized (shutdownList) { + shutdownList.remove(dagClient); + } + break; + } + } + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); + printSummary(success, vertexProgressMap); + return rc; + } + + private void printSummary(boolean success, Map<String, Progress> progressMap) { + if (isProfilingEnabled() && success && progressMap != null) { + + double duration = (System.currentTimeMillis() - this.executionStartTime) / 1000.0; + console.printInfo("Status: DAG finished successfully in " + String.format("%.2f seconds", duration)); + console.printInfo(""); + + new QueryExecutionBreakdownSummary(perfLogger).print(console); + new DAGSummary(progressMap, hiveConf, dagClient, dag, perfLogger).print(console); + + //llap IO summary + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.LLAP_IO_ENABLED, false)) { + new LLAPioSummary(progressMap, dagClient).print(console); + new FSCountersSummary(progressMap, dagClient).print(console); + } + console.printInfo(""); + } + } + + private static boolean hasInterruptedException(Throwable e) { + // Hadoop IPC wraps InterruptedException. GRRR. + while (e != null) { + if (e instanceof InterruptedException || e instanceof InterruptedIOException) { + return true; + } + e = e.getCause(); + } + return false; + } + + /** + * killRunningJobs tries to terminate execution of all + * currently running tez queries. No guarantees, best effort only. + */ + private static void killRunningJobs() { + synchronized (shutdownList) { + for (DAGClient c : shutdownList) { + try { + System.err.println("Trying to shutdown DAG"); + c.tryKillDAG(); + } catch (Exception e) { + // ignore + } + } + } + } + + static long getCounterValueByGroupName(TezCounters vertexCounters, String groupNamePattern, + String counterName) { + TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName); + return (tezCounter == null) ? 0 : tezCounter.getValue(); + } + + private String updateStatus(DAGStatus status, Map<String, Progress> vertexProgressMap, + String lastReport) { + String report = getReport(vertexProgressMap); + if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL) { + updateFunction.update(status, vertexProgressMap, report); + lastPrintTime = System.currentTimeMillis(); + } + return report; + } + + private String getReport(Map<String, Progress> progressMap) { + StringBuilder reportBuffer = new StringBuilder(); + + SortedSet<String> keys = new TreeSet<String>(progressMap.keySet()); + for (String s : keys) { + Progress progress = progressMap.get(s); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + final int running = progress.getRunningTaskCount(); + final int failed = progress.getFailedTaskAttemptCount(); + if (total <= 0) { + reportBuffer.append(String.format("%s: -/-\t", s)); + } else { + if (complete == total) { + /* + * We may have missed the start of the vertex due to the 3 seconds interval + */ + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + if (complete < total && (complete > 0 || running > 0 || failed > 0)) { + + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + + /* vertex is started, but not complete */ + if (failed > 0) { + reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total)); + } else { + reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total)); + } + } else { + /* vertex is waiting for input/slots or complete */ + if (failed > 0) { + /* tasks finished but some failed */ + reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total)); + } else { + reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total)); + } + } + } + } + + return reportBuffer.toString(); + } + + public String getDiagnostics() { + return diagnostics.toString(); + } + + private ProgressMonitor progressMonitor(DAGStatus status, Map<String, Progress> progressMap) { + try { + return new TezProgressMonitor(dagClient, status, workMap, progressMap, console, + executionStartTime); + } catch (IOException | TezException e) { + console.printInfo("Getting Progress Information: " + e.getMessage() + " stack trace: " + + ExceptionUtils.getStackTrace(e)); + } + return TezProgressMonitor.NULL; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java new file mode 100644 index 0000000..3475fc2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java @@ -0,0 +1,313 @@ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.common.log.ProgressMonitor; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.Progress; +import org.apache.tez.dag.api.client.VertexStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import static org.apache.tez.dag.api.client.DAGStatus.State.KILLED; + +class TezProgressMonitor implements ProgressMonitor { + private static final int COLUMN_1_WIDTH = 16; + private final Map<String, BaseWork> workMap; + private final SessionState.LogHelper console; + private final long executionStartTime; + private final DAGStatus status; + Map<String, VertexStatus> vertexStatusMap = new HashMap<>(); + Map<String, VertexProgress> progressCountsMap = new HashMap<>(); + + /** + * Try to get most the data required from dagClient in the constructor itself so that even after + * the tez job has finished this object can be used for later use.s + */ + TezProgressMonitor(DAGClient dagClient, DAGStatus status, Map<String, BaseWork> workMap, + Map<String, Progress> progressMap, SessionState.LogHelper console, long executionStartTime) + throws IOException, TezException { + this.status = status; + this.workMap = workMap; + this.console = console; + this.executionStartTime = executionStartTime; + for (Map.Entry<String, Progress> entry : progressMap.entrySet()) { + String vertexName = entry.getKey(); + progressCountsMap.put(vertexName, new VertexProgress(entry.getValue(), status.getState())); + try { + vertexStatusMap.put(vertexName, dagClient.getVertexStatus(vertexName, null)); + } catch (IOException e) { + // best attempt, shouldn't really kill DAG for this + } + } + } + + public List<String> headers() { + return Arrays.asList( + "VERTICES", + "MODE", + "STATUS", + "TOTAL", + "COMPLETED", + "RUNNING", + "PENDING", + "FAILED", + "KILLED" + ); + } + + public List<List<String>> rows() { + try { + List<List<String>> results = new ArrayList<>(); + SortedSet<String> keys = new TreeSet<>(progressCountsMap.keySet()); + for (String s : keys) { + VertexProgress progress = progressCountsMap.get(s); + + // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0 + + results.add( + Arrays.asList( + getNameWithProgress(s, progress.succeededTaskCount, progress.totalTaskCount), + getMode(s, workMap), + progress.vertexStatus(vertexStatusMap.get(s)), + progress.total(), + progress.completed(), + progress.running(), + progress.pending(), + progress.failed(), + progress.killed() + ) + ); + } + return results; + } catch (Exception e) { + console.printInfo( + "Getting Progress Bar table rows failed: " + e.getMessage() + " stack trace: " + Arrays + .toString(e.getStackTrace()) + ); + } + return Collections.emptyList(); + } + + // ------------------------------------------------------------------------------- + // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s + // ------------------------------------------------------------------------------- + // contains footerSummary , progressedPercentage, starTime + + @Override + public String footerSummary() { + return String.format("VERTICES: %02d/%02d", completed(), progressCountsMap.keySet().size()); + } + + @Override + public long startTime() { + return executionStartTime; + } + + @Override + public double progressedPercentage() { + int sumTotal = 0, sumComplete = 0; + for (String s : progressCountsMap.keySet()) { + VertexProgress progress = progressCountsMap.get(s); + final int complete = progress.succeededTaskCount; + final int total = progress.totalTaskCount; + if (total > 0) { + sumTotal += total; + sumComplete += complete; + } + } + return (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal; + } + + @Override + public String executionStatus() { + return this.status.getState().name(); + } + + private int completed() { + Set<String> completed = new HashSet<>(); + for (String s : progressCountsMap.keySet()) { + VertexProgress progress = progressCountsMap.get(s); + final int complete = progress.succeededTaskCount; + final int total = progress.totalTaskCount; + if (total > 0) { + if (complete == total) { + completed.add(s); + } + } + } + return completed.size(); + } + + // Map 1 .......... + + private String getNameWithProgress(String s, int complete, int total) { + String result = ""; + if (s != null) { + float percent = total == 0 ? 0.0f : (float) complete / (float) total; + // lets use the remaining space in column 1 as progress bar + int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1; + String trimmedVName = s; + + // if the vertex name is longer than column 1 width, trim it down + // "Tez Merge File Work" will become "Tez Merge File.." + if (s.length() > COLUMN_1_WIDTH) { + trimmedVName = s.substring(0, COLUMN_1_WIDTH - 1); + trimmedVName = trimmedVName + ".."; + } + + result = trimmedVName + " "; + int toFill = (int) (spaceRemaining * percent); + for (int i = 0; i < toFill; i++) { + result += "."; + } + } + return result; + } + + private String getMode(String name, Map<String, BaseWork> workMap) { + String mode = "container"; + BaseWork work = workMap.get(name); + if (work != null) { + // uber > llap > container + if (work.getUberMode()) { + mode = "uber"; + } else if (work.getLlapMode()) { + mode = "llap"; + } else { + mode = "container"; + } + } + return mode; + } + + static class VertexProgress { + private final int totalTaskCount; + private final int succeededTaskCount; + private final int failedTaskAttemptCount; + private final long killedTaskAttemptCount; + private final int runningTaskCount; + private final DAGStatus.State dagState; + + VertexProgress(Progress progress, DAGStatus.State dagState) { + this(progress.getTotalTaskCount(), progress.getSucceededTaskCount(), + progress.getFailedTaskAttemptCount(), progress.getKilledTaskAttemptCount(), + progress.getRunningTaskCount(), dagState); + } + + VertexProgress(int totalTaskCount, int succeededTaskCount, int failedTaskAttemptCount, + int killedTaskAttemptCount, int runningTaskCount, DAGStatus.State dagState) { + this.totalTaskCount = totalTaskCount; + this.succeededTaskCount = succeededTaskCount; + this.failedTaskAttemptCount = failedTaskAttemptCount; + this.killedTaskAttemptCount = killedTaskAttemptCount; + this.runningTaskCount = runningTaskCount; + this.dagState = dagState; + } + + boolean isRunning() { + return succeededTaskCount < totalTaskCount && (succeededTaskCount > 0 || runningTaskCount > 0 + || failedTaskAttemptCount > 0); + } + + String vertexStatus(VertexStatus vertexStatus) { + // To get vertex status we can use DAGClient.getVertexStatus(), but it will be expensive to + // get status from AM for every refresh of the UI. Lets infer the state from task counts. + // Only if DAG is FAILED or KILLED the vertex status is fetched from AM. + VertexStatus.State vertexState = VertexStatus.State.INITIALIZING; + if (totalTaskCount > 0) { + vertexState = VertexStatus.State.INITED; + } + + // RUNNING state + if (isRunning()) { + vertexState = VertexStatus.State.RUNNING; + } + + // SUCCEEDED state + if (succeededTaskCount == totalTaskCount) { + vertexState = VertexStatus.State.SUCCEEDED; + } + + // DAG might have been killed, lets try to get vertex state from AM before dying + // KILLED or FAILED state + if (dagState == KILLED) { + if (vertexStatus != null) { + vertexState = vertexStatus.getState(); + } + } + return vertexState.toString(); + } + + // "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED" + + String total() { + return String.valueOf(totalTaskCount); + } + + String completed() { + return String.valueOf(succeededTaskCount); + } + + String running() { + return String.valueOf(runningTaskCount); + } + + String pending() { + return String.valueOf(totalTaskCount - succeededTaskCount - runningTaskCount); + } + + String failed() { + return String.valueOf(failedTaskAttemptCount); + } + + String killed() { + return String.valueOf(killedTaskAttemptCount); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + VertexProgress that = (VertexProgress) o; + + if (totalTaskCount != that.totalTaskCount) + return false; + if (succeededTaskCount != that.succeededTaskCount) + return false; + if (failedTaskAttemptCount != that.failedTaskAttemptCount) + return false; + if (killedTaskAttemptCount != that.killedTaskAttemptCount) + return false; + if (runningTaskCount != that.runningTaskCount) + return false; + return dagState == that.dagState; + } + + @Override + public int hashCode() { + int result = totalTaskCount; + result = 31 * result + succeededTaskCount; + result = 31 * result + failedTaskAttemptCount; + result = 31 * result + (int) (killedTaskAttemptCount ^ (killedTaskAttemptCount >>> 32)); + result = 31 * result + runningTaskCount; + result = 31 * result + dagState.hashCode(); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index c5b3517..ed854bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -26,8 +26,7 @@ import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM; import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM; import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; @@ -127,11 +126,11 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.FunctionUtils; -import org.apache.hadoop.hive.ql.exec.InPlaceUpdates; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.common.log.InPlaceUpdate; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; @@ -1898,7 +1897,7 @@ private void constructOneLBLocationMap(FileStatus fSta, final AtomicInteger partitionsLoaded = new AtomicInteger(0); final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 - && InPlaceUpdates.inPlaceEligible(conf); + && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent(); final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null; final SessionState parentSession = SessionState.get(); @@ -1926,9 +1925,9 @@ private void constructOneLBLocationMap(FileStatus fSta, if (inPlaceEligible) { synchronized (ps) { - InPlaceUpdates.rePositionCursor(ps); + InPlaceUpdate.rePositionCursor(ps); partitionsLoaded.incrementAndGet(); - InPlaceUpdates.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" + InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" + partsToLoad + " partitions."); } } http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index d607f61..3e01e92 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; @@ -185,6 +186,7 @@ public class SessionState { private HiveAuthorizationProvider authorizer; private HiveAuthorizer authorizerV2; + private volatile ProgressMonitor progressMonitor; public enum AuthorizationMode{V1, V2}; @@ -1564,6 +1566,7 @@ public class SessionState { // removes the threadlocal variables, closes underlying HMS connection Hive.closeCurrent(); } + progressMonitor = null; } private void unCacheDataNucleusClassLoaders() { @@ -1744,6 +1747,15 @@ public class SessionState { public String getReloadableAuxJars() { return StringUtils.join(preReloadableAuxJars, ','); } + + public void updateProgressMonitor(ProgressMonitor progressMonitor) { + this.progressMonitor = progressMonitor; + } + + public ProgressMonitor getProgressMonitor() { + return progressMonitor; + } + } class ResourceMaps { http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java new file mode 100644 index 0000000..648d625 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java @@ -0,0 +1,101 @@ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.Progress; +import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.client.VertexStatus; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anySet; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TestTezProgressMonitor { + + private static final String REDUCER = "Reducer"; + private static final String MAPPER = "Mapper"; + @Mock + private DAGClient dagClient; + @Mock + private SessionState.LogHelper console; + @Mock + private DAGStatus dagStatus; + @Mock + private Progress mapperProgress; + @Mock + private Progress reducerProgress; + @Mock + private VertexStatus succeeded; + @Mock + private VertexStatus running; + + private Map<String, Progress> progressMap() { + return new HashMap<String, Progress>() {{ + put(MAPPER, setup(mapperProgress, 2, 1, 3, 4, 5)); + put(REDUCER, setup(reducerProgress, 3, 2, 1, 0, 1)); + }}; + } + + private Progress setup(Progress progressMock, int total, int succeeded, int failedAttempt, + int killedAttempt, int running) { + when(progressMock.getTotalTaskCount()).thenReturn(total); + when(progressMock.getSucceededTaskCount()).thenReturn(succeeded); + when(progressMock.getFailedTaskAttemptCount()).thenReturn(failedAttempt); + when(progressMock.getKilledTaskAttemptCount()).thenReturn(killedAttempt); + when(progressMock.getRunningTaskCount()).thenReturn(running); + return progressMock; + } + + @Test + public void setupInternalStateOnObjectCreation() throws IOException, TezException { + when(dagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(dagClient.getVertexStatus(eq(MAPPER), anySet())).thenReturn(succeeded); + when(dagClient.getVertexStatus(eq(REDUCER), anySet())).thenReturn(running); + + TezProgressMonitor monitor = + new TezProgressMonitor(dagClient, dagStatus, new HashMap<String, BaseWork>(), progressMap(), console, + Long.MAX_VALUE); + + verify(dagClient).getVertexStatus(eq(MAPPER), isNull(Set.class)); + verify(dagClient).getVertexStatus(eq(REDUCER), isNull(Set.class)); + verifyNoMoreInteractions(dagClient); + + assertThat(monitor.vertexStatusMap.keySet(), hasItems(MAPPER, REDUCER)); + assertThat(monitor.vertexStatusMap.get(MAPPER), is(sameInstance(succeeded))); + assertThat(monitor.vertexStatusMap.get(REDUCER), is(sameInstance(running))); + + assertThat(monitor.progressCountsMap.keySet(), hasItems(MAPPER, REDUCER)); + TezProgressMonitor.VertexProgress expectedMapperState = + new TezProgressMonitor.VertexProgress(2, 1, 3, 4, 5, DAGStatus.State.RUNNING); + assertThat(monitor.progressCountsMap.get(MAPPER), is(equalTo(expectedMapperState))); + + TezProgressMonitor.VertexProgress expectedReducerState = + new TezProgressMonitor.VertexProgress(3, 2, 1, 0, 1, DAGStatus.State.RUNNING); + assertThat(monitor.progressCountsMap.get(REDUCER), is(equalTo(expectedReducerState))); + + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/if/TCLIService.thrift ---------------------------------------------------------------------- diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift index a4fa7b0..824b049 100644 --- a/service-rpc/if/TCLIService.thrift +++ b/service-rpc/if/TCLIService.thrift @@ -63,6 +63,9 @@ enum TProtocolVersion { // V9 adds support for serializing ResultSets in SerDe HIVE_CLI_SERVICE_PROTOCOL_V9 + + // V10 adds support for in place updates via GetOperationStatus + HIVE_CLI_SERVICE_PROTOCOL_V10 } enum TTypeId { @@ -559,7 +562,7 @@ struct TOperationHandle { // which operations may be executed. struct TOpenSessionReq { // The version of the HiveServer2 protocol that the client is using. - 1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9 + 1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10 // Username and password for authentication. // Depending on the authentication scheme being used, @@ -578,7 +581,7 @@ struct TOpenSessionResp { 1: required TStatus status // The protocol version that the server is using. - 2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9 + 2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10 // Session Handle 3: optional TSessionHandle sessionHandle @@ -1019,6 +1022,8 @@ struct TGetCrossReferenceResp { struct TGetOperationStatusReq { // Session to run this request against 1: required TOperationHandle operationHandle + // optional arguments to get progress information + 2: optional bool getProgressUpdate } struct TGetOperationStatusResp { @@ -1047,6 +1052,8 @@ struct TGetOperationStatusResp { // If the operation has the result 9: optional bool hasResultSet + 10: optional TProgressUpdateResp progressUpdateResponse + } @@ -1202,6 +1209,21 @@ struct TRenewDelegationTokenResp { 1: required TStatus status } +enum TJobExecutionStatus { + IN_PROGRESS, + COMPLETE, + NOT_AVAILABLE +} + +struct TProgressUpdateResp { + 1: required list<string> headerNames + 2: required list<list<string>> rows + 3: required double progressedPercentage + 4: required TJobExecutionStatus status + 5: required string footerSummary + 6: required i64 startTime +} + service TCLIService { TOpenSessionResp OpenSession(1:TOpenSessionReq req); http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp ---------------------------------------------------------------------- diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp index 2f460e8..0a17e0e 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -269,6 +269,18 @@ const char* _kTFetchOrientationNames[] = { }; const std::map<int, const char*> _TFetchOrientation_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kTFetchOrientationValues, _kTFetchOrientationNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +int _kTJobExecutionStatusValues[] = { + TJobExecutionStatus::IN_PROGRESS, + TJobExecutionStatus::COMPLETE, + TJobExecutionStatus::NOT_AVAILABLE +}; +const char* _kTJobExecutionStatusNames[] = { + "IN_PROGRESS", + "COMPLETE", + "NOT_AVAILABLE" +}; +const std::map<int, const char*> _TJobExecutionStatus_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(3, _kTJobExecutionStatusValues, _kTJobExecutionStatusNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); + TTypeQualifierValue::~TTypeQualifierValue() throw() { } @@ -8174,6 +8186,11 @@ void TGetOperationStatusReq::__set_operationHandle(const TOperationHandle& val) this->operationHandle = val; } +void TGetOperationStatusReq::__set_getProgressUpdate(const bool val) { + this->getProgressUpdate = val; +__isset.getProgressUpdate = true; +} + uint32_t TGetOperationStatusReq::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -8204,6 +8221,14 @@ uint32_t TGetOperationStatusReq::read(::apache::thrift::protocol::TProtocol* ipr xfer += iprot->skip(ftype); } break; + case 2: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->getProgressUpdate); + this->__isset.getProgressUpdate = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -8227,6 +8252,11 @@ uint32_t TGetOperationStatusReq::write(::apache::thrift::protocol::TProtocol* op xfer += this->operationHandle.write(oprot); xfer += oprot->writeFieldEnd(); + if (this->__isset.getProgressUpdate) { + xfer += oprot->writeFieldBegin("getProgressUpdate", ::apache::thrift::protocol::T_BOOL, 2); + xfer += oprot->writeBool(this->getProgressUpdate); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -8235,19 +8265,26 @@ uint32_t TGetOperationStatusReq::write(::apache::thrift::protocol::TProtocol* op void swap(TGetOperationStatusReq &a, TGetOperationStatusReq &b) { using ::std::swap; swap(a.operationHandle, b.operationHandle); + swap(a.getProgressUpdate, b.getProgressUpdate); + swap(a.__isset, b.__isset); } TGetOperationStatusReq::TGetOperationStatusReq(const TGetOperationStatusReq& other268) { operationHandle = other268.operationHandle; + getProgressUpdate = other268.getProgressUpdate; + __isset = other268.__isset; } TGetOperationStatusReq& TGetOperationStatusReq::operator=(const TGetOperationStatusReq& other269) { operationHandle = other269.operationHandle; + getProgressUpdate = other269.getProgressUpdate; + __isset = other269.__isset; return *this; } void TGetOperationStatusReq::printTo(std::ostream& out) const { using ::apache::thrift::to_string; out << "TGetOperationStatusReq("; out << "operationHandle=" << to_string(operationHandle); + out << ", " << "getProgressUpdate="; (__isset.getProgressUpdate ? (out << to_string(getProgressUpdate)) : (out << "<null>")); out << ")"; } @@ -8300,6 +8337,11 @@ void TGetOperationStatusResp::__set_hasResultSet(const bool val) { __isset.hasResultSet = true; } +void TGetOperationStatusResp::__set_progressUpdateResponse(const TProgressUpdateResp& val) { + this->progressUpdateResponse = val; +__isset.progressUpdateResponse = true; +} + uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -8396,6 +8438,14 @@ uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* ip xfer += iprot->skip(ftype); } break; + case 10: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->progressUpdateResponse.read(iprot); + this->__isset.progressUpdateResponse = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -8459,6 +8509,11 @@ uint32_t TGetOperationStatusResp::write(::apache::thrift::protocol::TProtocol* o xfer += oprot->writeBool(this->hasResultSet); xfer += oprot->writeFieldEnd(); } + if (this->__isset.progressUpdateResponse) { + xfer += oprot->writeFieldBegin("progressUpdateResponse", ::apache::thrift::protocol::T_STRUCT, 10); + xfer += this->progressUpdateResponse.write(oprot); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -8475,6 +8530,7 @@ void swap(TGetOperationStatusResp &a, TGetOperationStatusResp &b) { swap(a.operationStarted, b.operationStarted); swap(a.operationCompleted, b.operationCompleted); swap(a.hasResultSet, b.hasResultSet); + swap(a.progressUpdateResponse, b.progressUpdateResponse); swap(a.__isset, b.__isset); } @@ -8488,6 +8544,7 @@ TGetOperationStatusResp::TGetOperationStatusResp(const TGetOperationStatusResp& operationStarted = other271.operationStarted; operationCompleted = other271.operationCompleted; hasResultSet = other271.hasResultSet; + progressUpdateResponse = other271.progressUpdateResponse; __isset = other271.__isset; } TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationStatusResp& other272) { @@ -8500,6 +8557,7 @@ TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationS operationStarted = other272.operationStarted; operationCompleted = other272.operationCompleted; hasResultSet = other272.hasResultSet; + progressUpdateResponse = other272.progressUpdateResponse; __isset = other272.__isset; return *this; } @@ -8515,6 +8573,7 @@ void TGetOperationStatusResp::printTo(std::ostream& out) const { out << ", " << "operationStarted="; (__isset.operationStarted ? (out << to_string(operationStarted)) : (out << "<null>")); out << ", " << "operationCompleted="; (__isset.operationCompleted ? (out << to_string(operationCompleted)) : (out << "<null>")); out << ", " << "hasResultSet="; (__isset.hasResultSet ? (out << to_string(hasResultSet)) : (out << "<null>")); + out << ", " << "progressUpdateResponse="; (__isset.progressUpdateResponse ? (out << to_string(progressUpdateResponse)) : (out << "<null>")); out << ")"; } @@ -9984,4 +10043,267 @@ void TRenewDelegationTokenResp::printTo(std::ostream& out) const { out << ")"; } + +TProgressUpdateResp::~TProgressUpdateResp() throw() { +} + + +void TProgressUpdateResp::__set_headerNames(const std::vector<std::string> & val) { + this->headerNames = val; +} + +void TProgressUpdateResp::__set_rows(const std::vector<std::vector<std::string> > & val) { + this->rows = val; +} + +void TProgressUpdateResp::__set_progressedPercentage(const double val) { + this->progressedPercentage = val; +} + +void TProgressUpdateResp::__set_status(const TJobExecutionStatus::type val) { + this->status = val; +} + +void TProgressUpdateResp::__set_footerSummary(const std::string& val) { + this->footerSummary = val; +} + +void TProgressUpdateResp::__set_startTime(const int64_t val) { + this->startTime = val; +} + +uint32_t TProgressUpdateResp::read(::apache::thrift::protocol::TProtocol* iprot) { + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + bool isset_headerNames = false; + bool isset_rows = false; + bool isset_progressedPercentage = false; + bool isset_status = false; + bool isset_footerSummary = false; + bool isset_startTime = false; + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_LIST) { + { + this->headerNames.clear(); + uint32_t _size302; + ::apache::thrift::protocol::TType _etype305; + xfer += iprot->readListBegin(_etype305, _size302); + this->headerNames.resize(_size302); + uint32_t _i306; + for (_i306 = 0; _i306 < _size302; ++_i306) + { + xfer += iprot->readString(this->headerNames[_i306]); + } + xfer += iprot->readListEnd(); + } + isset_headerNames = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_LIST) { + { + this->rows.clear(); + uint32_t _size307; + ::apache::thrift::protocol::TType _etype310; + xfer += iprot->readListBegin(_etype310, _size307); + this->rows.resize(_size307); + uint32_t _i311; + for (_i311 = 0; _i311 < _size307; ++_i311) + { + { + this->rows[_i311].clear(); + uint32_t _size312; + ::apache::thrift::protocol::TType _etype315; + xfer += iprot->readListBegin(_etype315, _size312); + this->rows[_i311].resize(_size312); + uint32_t _i316; + for (_i316 = 0; _i316 < _size312; ++_i316) + { + xfer += iprot->readString(this->rows[_i311][_i316]); + } + xfer += iprot->readListEnd(); + } + } + xfer += iprot->readListEnd(); + } + isset_rows = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_DOUBLE) { + xfer += iprot->readDouble(this->progressedPercentage); + isset_progressedPercentage = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast317; + xfer += iprot->readI32(ecast317); + this->status = (TJobExecutionStatus::type)ecast317; + isset_status = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 5: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->footerSummary); + isset_footerSummary = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 6: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->startTime); + isset_startTime = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + if (!isset_headerNames) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_rows) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_progressedPercentage) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_status) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_footerSummary) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_startTime) + throw TProtocolException(TProtocolException::INVALID_DATA); + return xfer; +} + +uint32_t TProgressUpdateResp::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("TProgressUpdateResp"); + + xfer += oprot->writeFieldBegin("headerNames", ::apache::thrift::protocol::T_LIST, 1); + { + xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>(this->headerNames.size())); + std::vector<std::string> ::const_iterator _iter318; + for (_iter318 = this->headerNames.begin(); _iter318 != this->headerNames.end(); ++_iter318) + { + xfer += oprot->writeString((*_iter318)); + } + xfer += oprot->writeListEnd(); + } + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("rows", ::apache::thrift::protocol::T_LIST, 2); + { + xfer += oprot->writeListBegin(::apache::thrift::protocol::T_LIST, static_cast<uint32_t>(this->rows.size())); + std::vector<std::vector<std::string> > ::const_iterator _iter319; + for (_iter319 = this->rows.begin(); _iter319 != this->rows.end(); ++_iter319) + { + { + xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>((*_iter319).size())); + std::vector<std::string> ::const_iterator _iter320; + for (_iter320 = (*_iter319).begin(); _iter320 != (*_iter319).end(); ++_iter320) + { + xfer += oprot->writeString((*_iter320)); + } + xfer += oprot->writeListEnd(); + } + } + xfer += oprot->writeListEnd(); + } + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("progressedPercentage", ::apache::thrift::protocol::T_DOUBLE, 3); + xfer += oprot->writeDouble(this->progressedPercentage); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("status", ::apache::thrift::protocol::T_I32, 4); + xfer += oprot->writeI32((int32_t)this->status); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("footerSummary", ::apache::thrift::protocol::T_STRING, 5); + xfer += oprot->writeString(this->footerSummary); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("startTime", ::apache::thrift::protocol::T_I64, 6); + xfer += oprot->writeI64(this->startTime); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(TProgressUpdateResp &a, TProgressUpdateResp &b) { + using ::std::swap; + swap(a.headerNames, b.headerNames); + swap(a.rows, b.rows); + swap(a.progressedPercentage, b.progressedPercentage); + swap(a.status, b.status); + swap(a.footerSummary, b.footerSummary); + swap(a.startTime, b.startTime); +} + +TProgressUpdateResp::TProgressUpdateResp(const TProgressUpdateResp& other321) { + headerNames = other321.headerNames; + rows = other321.rows; + progressedPercentage = other321.progressedPercentage; + status = other321.status; + footerSummary = other321.footerSummary; + startTime = other321.startTime; +} +TProgressUpdateResp& TProgressUpdateResp::operator=(const TProgressUpdateResp& other322) { + headerNames = other322.headerNames; + rows = other322.rows; + progressedPercentage = other322.progressedPercentage; + status = other322.status; + footerSummary = other322.footerSummary; + startTime = other322.startTime; + return *this; +} +void TProgressUpdateResp::printTo(std::ostream& out) const { + using ::apache::thrift::to_string; + out << "TProgressUpdateResp("; + out << "headerNames=" << to_string(headerNames); + out << ", " << "rows=" << to_string(rows); + out << ", " << "progressedPercentage=" << to_string(progressedPercentage); + out << ", " << "status=" << to_string(status); + out << ", " << "footerSummary=" << to_string(footerSummary); + out << ", " << "startTime=" << to_string(startTime); + out << ")"; +} + }}}}} // namespace http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h ---------------------------------------------------------------------- diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h index b249544..6c2bb34 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h @@ -175,6 +175,16 @@ struct TFetchOrientation { extern const std::map<int, const char*> _TFetchOrientation_VALUES_TO_NAMES; +struct TJobExecutionStatus { + enum type { + IN_PROGRESS = 0, + COMPLETE = 1, + NOT_AVAILABLE = 2 + }; +}; + +extern const std::map<int, const char*> _TJobExecutionStatus_VALUES_TO_NAMES; + typedef int32_t TTypeEntryPtr; typedef std::string TIdentifier; @@ -339,6 +349,8 @@ class TRenewDelegationTokenReq; class TRenewDelegationTokenResp; +class TProgressUpdateResp; + typedef struct _TTypeQualifierValue__isset { _TTypeQualifierValue__isset() : i32Value(false), stringValue(false) {} bool i32Value :1; @@ -3669,24 +3681,37 @@ inline std::ostream& operator<<(std::ostream& out, const TGetCrossReferenceResp& return out; } +typedef struct _TGetOperationStatusReq__isset { + _TGetOperationStatusReq__isset() : getProgressUpdate(false) {} + bool getProgressUpdate :1; +} _TGetOperationStatusReq__isset; class TGetOperationStatusReq { public: TGetOperationStatusReq(const TGetOperationStatusReq&); TGetOperationStatusReq& operator=(const TGetOperationStatusReq&); - TGetOperationStatusReq() { + TGetOperationStatusReq() : getProgressUpdate(0) { } virtual ~TGetOperationStatusReq() throw(); TOperationHandle operationHandle; + bool getProgressUpdate; + + _TGetOperationStatusReq__isset __isset; void __set_operationHandle(const TOperationHandle& val); + void __set_getProgressUpdate(const bool val); + bool operator == (const TGetOperationStatusReq & rhs) const { if (!(operationHandle == rhs.operationHandle)) return false; + if (__isset.getProgressUpdate != rhs.__isset.getProgressUpdate) + return false; + else if (__isset.getProgressUpdate && !(getProgressUpdate == rhs.getProgressUpdate)) + return false; return true; } bool operator != (const TGetOperationStatusReq &rhs) const { @@ -3710,7 +3735,7 @@ inline std::ostream& operator<<(std::ostream& out, const TGetOperationStatusReq& } typedef struct _TGetOperationStatusResp__isset { - _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false), hasResultSet(false) {} + _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false), hasResultSet(false), progressUpdateResponse(false) {} bool operationState :1; bool sqlState :1; bool errorCode :1; @@ -3719,6 +3744,7 @@ typedef struct _TGetOperationStatusResp__isset { bool operationStarted :1; bool operationCompleted :1; bool hasResultSet :1; + bool progressUpdateResponse :1; } _TGetOperationStatusResp__isset; class TGetOperationStatusResp { @@ -3739,6 +3765,7 @@ class TGetOperationStatusResp { int64_t operationStarted; int64_t operationCompleted; bool hasResultSet; + TProgressUpdateResp progressUpdateResponse; _TGetOperationStatusResp__isset __isset; @@ -3760,6 +3787,8 @@ class TGetOperationStatusResp { void __set_hasResultSet(const bool val); + void __set_progressUpdateResponse(const TProgressUpdateResp& val); + bool operator == (const TGetOperationStatusResp & rhs) const { if (!(status == rhs.status)) @@ -3796,6 +3825,10 @@ class TGetOperationStatusResp { return false; else if (__isset.hasResultSet && !(hasResultSet == rhs.hasResultSet)) return false; + if (__isset.progressUpdateResponse != rhs.__isset.progressUpdateResponse) + return false; + else if (__isset.progressUpdateResponse && !(progressUpdateResponse == rhs.progressUpdateResponse)) + return false; return true; } bool operator != (const TGetOperationStatusResp &rhs) const { @@ -4470,6 +4503,71 @@ inline std::ostream& operator<<(std::ostream& out, const TRenewDelegationTokenRe return out; } + +class TProgressUpdateResp { + public: + + TProgressUpdateResp(const TProgressUpdateResp&); + TProgressUpdateResp& operator=(const TProgressUpdateResp&); + TProgressUpdateResp() : progressedPercentage(0), status((TJobExecutionStatus::type)0), footerSummary(), startTime(0) { + } + + virtual ~TProgressUpdateResp() throw(); + std::vector<std::string> headerNames; + std::vector<std::vector<std::string> > rows; + double progressedPercentage; + TJobExecutionStatus::type status; + std::string footerSummary; + int64_t startTime; + + void __set_headerNames(const std::vector<std::string> & val); + + void __set_rows(const std::vector<std::vector<std::string> > & val); + + void __set_progressedPercentage(const double val); + + void __set_status(const TJobExecutionStatus::type val); + + void __set_footerSummary(const std::string& val); + + void __set_startTime(const int64_t val); + + bool operator == (const TProgressUpdateResp & rhs) const + { + if (!(headerNames == rhs.headerNames)) + return false; + if (!(rows == rhs.rows)) + return false; + if (!(progressedPercentage == rhs.progressedPercentage)) + return false; + if (!(status == rhs.status)) + return false; + if (!(footerSummary == rhs.footerSummary)) + return false; + if (!(startTime == rhs.startTime)) + return false; + return true; + } + bool operator != (const TProgressUpdateResp &rhs) const { + return !(*this == rhs); + } + + bool operator < (const TProgressUpdateResp & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + + virtual void printTo(std::ostream& out) const; +}; + +void swap(TProgressUpdateResp &a, TProgressUpdateResp &b); + +inline std::ostream& operator<<(std::ostream& out, const TProgressUpdateResp& obj) +{ + obj.printTo(out); + return out; +} + }}}}} // namespace #endif http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java ---------------------------------------------------------------------- diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java index 84c64cd..af31ce2 100644 --- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java +++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java @@ -39,6 +39,7 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TGetOperationStatusReq"); private static final org.apache.thrift.protocol.TField OPERATION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationHandle", org.apache.thrift.protocol.TType.STRUCT, (short)1); + private static final org.apache.thrift.protocol.TField GET_PROGRESS_UPDATE_FIELD_DESC = new org.apache.thrift.protocol.TField("getProgressUpdate", org.apache.thrift.protocol.TType.BOOL, (short)2); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -47,10 +48,12 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera } private TOperationHandle operationHandle; // required + private boolean getProgressUpdate; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { - OPERATION_HANDLE((short)1, "operationHandle"); + OPERATION_HANDLE((short)1, "operationHandle"), + GET_PROGRESS_UPDATE((short)2, "getProgressUpdate"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -67,6 +70,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera switch(fieldId) { case 1: // OPERATION_HANDLE return OPERATION_HANDLE; + case 2: // GET_PROGRESS_UPDATE + return GET_PROGRESS_UPDATE; default: return null; } @@ -107,11 +112,16 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera } // isset id assignments + private static final int __GETPROGRESSUPDATE_ISSET_ID = 0; + private byte __isset_bitfield = 0; + private static final _Fields optionals[] = {_Fields.GET_PROGRESS_UPDATE}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); tmpMap.put(_Fields.OPERATION_HANDLE, new org.apache.thrift.meta_data.FieldMetaData("operationHandle", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TOperationHandle.class))); + tmpMap.put(_Fields.GET_PROGRESS_UPDATE, new org.apache.thrift.meta_data.FieldMetaData("getProgressUpdate", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TGetOperationStatusReq.class, metaDataMap); } @@ -130,9 +140,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera * Performs a deep copy on <i>other</i>. */ public TGetOperationStatusReq(TGetOperationStatusReq other) { + __isset_bitfield = other.__isset_bitfield; if (other.isSetOperationHandle()) { this.operationHandle = new TOperationHandle(other.operationHandle); } + this.getProgressUpdate = other.getProgressUpdate; } public TGetOperationStatusReq deepCopy() { @@ -142,6 +154,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera @Override public void clear() { this.operationHandle = null; + setGetProgressUpdateIsSet(false); + this.getProgressUpdate = false; } public TOperationHandle getOperationHandle() { @@ -167,6 +181,28 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera } } + public boolean isGetProgressUpdate() { + return this.getProgressUpdate; + } + + public void setGetProgressUpdate(boolean getProgressUpdate) { + this.getProgressUpdate = getProgressUpdate; + setGetProgressUpdateIsSet(true); + } + + public void unsetGetProgressUpdate() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID); + } + + /** Returns true if field getProgressUpdate is set (has been assigned a value) and false otherwise */ + public boolean isSetGetProgressUpdate() { + return EncodingUtils.testBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID); + } + + public void setGetProgressUpdateIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case OPERATION_HANDLE: @@ -177,6 +213,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera } break; + case GET_PROGRESS_UPDATE: + if (value == null) { + unsetGetProgressUpdate(); + } else { + setGetProgressUpdate((Boolean)value); + } + break; + } } @@ -185,6 +229,9 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera case OPERATION_HANDLE: return getOperationHandle(); + case GET_PROGRESS_UPDATE: + return isGetProgressUpdate(); + } throw new IllegalStateException(); } @@ -198,6 +245,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera switch (field) { case OPERATION_HANDLE: return isSetOperationHandle(); + case GET_PROGRESS_UPDATE: + return isSetGetProgressUpdate(); } throw new IllegalStateException(); } @@ -224,6 +273,15 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera return false; } + boolean this_present_getProgressUpdate = true && this.isSetGetProgressUpdate(); + boolean that_present_getProgressUpdate = true && that.isSetGetProgressUpdate(); + if (this_present_getProgressUpdate || that_present_getProgressUpdate) { + if (!(this_present_getProgressUpdate && that_present_getProgressUpdate)) + return false; + if (this.getProgressUpdate != that.getProgressUpdate) + return false; + } + return true; } @@ -236,6 +294,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera if (present_operationHandle) list.add(operationHandle); + boolean present_getProgressUpdate = true && (isSetGetProgressUpdate()); + list.add(present_getProgressUpdate); + if (present_getProgressUpdate) + list.add(getProgressUpdate); + return list.hashCode(); } @@ -257,6 +320,16 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera return lastComparison; } } + lastComparison = Boolean.valueOf(isSetGetProgressUpdate()).compareTo(other.isSetGetProgressUpdate()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetGetProgressUpdate()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.getProgressUpdate, other.getProgressUpdate); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -284,6 +357,12 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera sb.append(this.operationHandle); } first = false; + if (isSetGetProgressUpdate()) { + if (!first) sb.append(", "); + sb.append("getProgressUpdate:"); + sb.append(this.getProgressUpdate); + first = false; + } sb.append(")"); return sb.toString(); } @@ -310,6 +389,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -343,6 +424,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 2: // GET_PROGRESS_UPDATE + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.getProgressUpdate = iprot.readBool(); + struct.setGetProgressUpdateIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -361,6 +450,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera struct.operationHandle.write(oprot); oprot.writeFieldEnd(); } + if (struct.isSetGetProgressUpdate()) { + oprot.writeFieldBegin(GET_PROGRESS_UPDATE_FIELD_DESC); + oprot.writeBool(struct.getProgressUpdate); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -379,6 +473,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera public void write(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatusReq struct) throws org.apache.thrift.TException { TTupleProtocol oprot = (TTupleProtocol) prot; struct.operationHandle.write(oprot); + BitSet optionals = new BitSet(); + if (struct.isSetGetProgressUpdate()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetGetProgressUpdate()) { + oprot.writeBool(struct.getProgressUpdate); + } } @Override @@ -387,6 +489,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera struct.operationHandle = new TOperationHandle(); struct.operationHandle.read(iprot); struct.setOperationHandleIsSet(true); + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.getProgressUpdate = iprot.readBool(); + struct.setGetProgressUpdateIsSet(true); + } } }
