HIVE-13258: LLAP: Add hdfs bytes read and spilled bytes to tez print summary (Prasanth Jayachandran reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5b2c36a4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5b2c36a4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5b2c36a4 Branch: refs/heads/master Commit: 5b2c36a4a88d279f134f3a8e5c928bb1473b9673 Parents: f9adb4a Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Wed Jul 13 09:42:22 2016 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Wed Jul 13 09:42:22 2016 -0700 ---------------------------------------------------------------------- itests/qtest/pom.xml | 2 +- .../test/resources/testconfiguration.properties | 3 + .../org/apache/hadoop/hive/llap/LlapUtil.java | 108 ++ .../hive/llap/counters/LlapIOCounters.java | 50 +- .../llap/counters/QueryFragmentCounters.java | 4 + .../hive/llap/daemon/impl/LlapTaskReporter.java | 16 +- .../daemon/impl/StatsRecordingThreadPool.java | 189 +++ .../llap/daemon/impl/TaskRunnerCallable.java | 22 +- .../hive/llap/io/api/impl/LlapInputFormat.java | 48 +- .../hive/llap/io/api/impl/LlapIoImpl.java | 13 +- .../llap/io/encoded/OrcEncodedDataReader.java | 5 + .../hive/llap/tezplugins/LlapTezUtils.java | 28 + .../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 66 + .../ql/hooks/PostExecTezSummaryPrinter.java | 22 +- .../queries/clientpositive/orc_llap_counters.q | 182 +++ .../queries/clientpositive/orc_llap_counters1.q | 83 ++ .../results/clientpositive/llap/orc_llap.q.out | 252 ++-- .../clientpositive/llap/orc_llap_counters.q.out | 1301 ++++++++++++++++++ .../llap/orc_llap_counters1.q.out | 331 +++++ .../clientpositive/llap/orc_ppd_basic.q.out | 551 ++++++++ .../clientpositive/tez/orc_ppd_basic.q.out | 252 ++++ 21 files changed, 3374 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/itests/qtest/pom.xml ---------------------------------------------------------------------- diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml index 306a0ed..17968e6 100644 --- a/itests/qtest/pom.xml +++ b/itests/qtest/pom.xml @@ -545,7 +545,7 @@ logFile="${project.build.directory}/testminitezclidrivergen.log" logDirectory="${project.build.directory}/qfile-results/clientpositive/" hadoopVersion="${hadoop.version}" - initScript="q_test_init.sql" + initScript="${initScript}" cleanupScript="q_test_cleanup.sql"/> <qtestgen hiveRootDirectory="${basedir}/${hive.path.to.root}/" http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index eb0d1d7..73fcb03 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -526,6 +526,9 @@ minillap.shared.query.files=bucket_map_join_tez1.q,\ minillap.query.files=llap_udf.q,\ + orc_llap.q,\ + orc_llap_counters.q,\ + orc_llap_counters1.q,\ orc_llap_nonvector.q http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index 505ddb1..298be76 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -14,9 +14,14 @@ package org.apache.hadoop.hive.llap; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.security.SecurityUtil; @@ -56,4 +61,107 @@ public class LlapUtil { String[] components = principal.split("[/@]"); return (components == null || components.length != 3) ? principal : components[0]; } + + public static List<StatisticsData> getStatisticsForScheme(final String scheme, + final List<StatisticsData> stats) { + List<StatisticsData> result = new ArrayList<>(); + if (stats != null && scheme != null) { + for (StatisticsData s : stats) { + if (s.getScheme().equalsIgnoreCase(scheme)) { + result.add(s); + } + } + } + return result; + } + + public static Map<String, FileSystem.Statistics> getCombinedFileSystemStatistics() { + final List<FileSystem.Statistics> allStats = FileSystem.getAllStatistics(); + final Map<String, FileSystem.Statistics> result = new HashMap<>(); + for (FileSystem.Statistics statistics : allStats) { + final String scheme = statistics.getScheme(); + if (result.containsKey(scheme)) { + FileSystem.Statistics existing = result.get(scheme); + FileSystem.Statistics combined = combineFileSystemStatistics(existing, statistics); + result.put(scheme, combined); + } else { + result.put(scheme, statistics); + } + } + return result; + } + + private static FileSystem.Statistics combineFileSystemStatistics(final FileSystem.Statistics s1, + final FileSystem.Statistics s2) { + FileSystem.Statistics result = new FileSystem.Statistics(s1); + result.incrementReadOps(s2.getReadOps()); + result.incrementLargeReadOps(s2.getLargeReadOps()); + result.incrementWriteOps(s2.getWriteOps()); + result.incrementBytesRead(s2.getBytesRead()); + result.incrementBytesWritten(s2.getBytesWritten()); + return result; + } + + public static List<StatisticsData> cloneThreadLocalFileSystemStatistics() { + List<StatisticsData> result = new ArrayList<>(); + // thread local filesystem stats is private and cannot be cloned. So make a copy to new class + for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { + result.add(new StatisticsData(statistics.getScheme(), statistics.getThreadStatistics())); + } + return result; + } + + public static class StatisticsData { + long bytesRead; + long bytesWritten; + int readOps; + int largeReadOps; + int writeOps; + String scheme; + + public StatisticsData(String scheme, FileSystem.Statistics.StatisticsData fsStats) { + this.scheme = scheme; + this.bytesRead = fsStats.getBytesRead(); + this.bytesWritten = fsStats.getBytesWritten(); + this.readOps = fsStats.getReadOps(); + this.largeReadOps = fsStats.getLargeReadOps(); + this.writeOps = fsStats.getWriteOps(); + } + + public long getBytesRead() { + return bytesRead; + } + + public long getBytesWritten() { + return bytesWritten; + } + + public int getReadOps() { + return readOps; + } + + public int getLargeReadOps() { + return largeReadOps; + } + + public int getWriteOps() { + return writeOps; + } + + public String getScheme() { + return scheme; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" scheme: ").append(scheme); + sb.append(" bytesRead: ").append(bytesRead); + sb.append(" bytesWritten: ").append(bytesWritten); + sb.append(" readOps: ").append(readOps); + sb.append(" largeReadOps: ").append(largeReadOps); + sb.append(" writeOps: ").append(writeOps); + return sb.toString(); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java index 365ddab..1ed23ba 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java @@ -15,23 +15,43 @@ */ package org.apache.hadoop.hive.llap.counters; +import java.util.ArrayList; +import java.util.List; + /** * LLAP IO related counters. */ public enum LlapIOCounters { - NUM_VECTOR_BATCHES, - NUM_DECODED_BATCHES, - SELECTED_ROWGROUPS, - NUM_ERRORS, - ROWS_EMITTED, - METADATA_CACHE_HIT, - METADATA_CACHE_MISS, - CACHE_HIT_BYTES, - CACHE_MISS_BYTES, - ALLOCATED_BYTES, - ALLOCATED_USED_BYTES, - TOTAL_IO_TIME_NS, - DECODE_TIME_NS, - HDFS_TIME_NS, - CONSUMER_TIME_NS + NUM_VECTOR_BATCHES(true), + NUM_DECODED_BATCHES(true), + SELECTED_ROWGROUPS(true), + NUM_ERRORS(true), + ROWS_EMITTED(true), + METADATA_CACHE_HIT(true), + METADATA_CACHE_MISS(true), + CACHE_HIT_BYTES(true), + CACHE_MISS_BYTES(true), + ALLOCATED_BYTES(true), + ALLOCATED_USED_BYTES(true), + TOTAL_IO_TIME_NS(false), + DECODE_TIME_NS(false), + HDFS_TIME_NS(false), + CONSUMER_TIME_NS(false); + + // flag to indicate if these counters are subject to change across different test runs + private boolean testSafe; + + LlapIOCounters(final boolean testSafe) { + this.testSafe = testSafe; + } + + public static List<String> testSafeCounterNames() { + List<String> testSafeCounters = new ArrayList<>(); + for (LlapIOCounters counter : values()) { + if (counter.testSafe) { + testSafeCounters.add(counter.name()); + } + } + return testSafeCounters; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java index a53ac61..0c858eb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java @@ -135,4 +135,8 @@ public class QueryFragmentCounters implements LowLevelCacheCounters { sb.append(" ]"); return sb.toString(); } + + public TezCounters getTezCounters() { + return tezCounters; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 9c34ada..39b4b0e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -80,7 +80,7 @@ public class LlapTaskReporter implements TaskReporterInterface { private final int maxEventsToGet; private final AtomicLong requestCounter; private final String containerIdStr; - private final String fragmentFullId; + private final String fragmentId; private final TezEvent initialEvent; private final ListeningExecutorService heartbeatExecutor; @@ -90,14 +90,14 @@ public class LlapTaskReporter implements TaskReporterInterface { public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, - String containerIdStr, final String fragFullId, TezEvent initialEvent) { + String containerIdStr, final String fragmentId, TezEvent initialEvent) { this.umbilical = umbilical; this.pollInterval = amPollInterval; this.sendCounterInterval = sendCounterInterval; this.maxEventsToGet = maxEventsToGet; this.requestCounter = requestCounter; this.containerIdStr = containerIdStr; - this.fragmentFullId = fragFullId; + this.fragmentId = fragmentId; this.initialEvent = initialEvent; ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("TaskHeartbeatThread").build()); @@ -110,9 +110,9 @@ public class LlapTaskReporter implements TaskReporterInterface { @Override public synchronized void registerTask(RuntimeTask task, ErrorReporter errorReporter) { - TezCounters tezCounters = task.addAndGetTezCounter(fragmentFullId); - FragmentCountersMap.registerCountersForFragment(fragmentFullId, tezCounters); - LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentFullId, task.getVertexName()); + TezCounters tezCounters = task.addAndGetTezCounter(fragmentId); + FragmentCountersMap.registerCountersForFragment(fragmentId, tezCounters); + LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentId, task.getVertexName()); currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval, maxEventsToGet, requestCounter, containerIdStr, initialEvent); ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable); @@ -125,8 +125,8 @@ public class LlapTaskReporter implements TaskReporterInterface { */ @Override public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) { - LOG.info("Unregistered counters for fragment: {}", fragmentFullId); - FragmentCountersMap.unregisterCountersForFragment(fragmentFullId); + LOG.info("Unregistered counters for fragment: {}", fragmentId); + FragmentCountersMap.unregisterCountersForFragment(fragmentId); currentCallable.markComplete(); currentCallable = null; } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java new file mode 100644 index 0000000..9b3ce7e --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.task.TaskRunner2Callable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Custom thread pool implementation that records per thread file system statistics in TezCounters. + * The way it works is we capture before and after snapshots of file system thread statistics, + * compute the delta difference in statistics and update them in tez task counters. + */ +public class StatsRecordingThreadPool extends ThreadPoolExecutor { + private static final Logger LOG = LoggerFactory.getLogger(StatsRecordingThreadPool.class); + // uncaught exception handler that will be set for all threads before execution + private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; + + public StatsRecordingThreadPool(final int corePoolSize, final int maximumPoolSize, + final long keepAliveTime, + final TimeUnit unit, + final BlockingQueue<Runnable> workQueue, + final ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, null); + } + + public StatsRecordingThreadPool(final int corePoolSize, final int maximumPoolSize, + final long keepAliveTime, + final TimeUnit unit, + final BlockingQueue<Runnable> workQueue, + final ThreadFactory threadFactory, Thread.UncaughtExceptionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + this.uncaughtExceptionHandler = handler; + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) { + return new FutureTask(new WrappedCallable(callable, uncaughtExceptionHandler)); + } + + public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) { + this.uncaughtExceptionHandler = handler; + } + + /** + * Callable that wraps the actual callable submitted to the thread pool and invokes completion + * listener in finally block. + * + * @param <V> - actual callable + */ + private static class WrappedCallable<V> implements Callable<V> { + private Callable<V> actualCallable; + private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; + + WrappedCallable(final Callable<V> callable, + final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + this.actualCallable = callable; + this.uncaughtExceptionHandler = uncaughtExceptionHandler; + } + + @Override + public V call() throws Exception { + Thread thread = Thread.currentThread(); + + // setup uncaught exception handler for the current thread + if (uncaughtExceptionHandler != null) { + thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); + } + + // clone thread local file system statistics + List<LlapUtil.StatisticsData> statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics(); + + try { + return actualCallable.call(); + } finally { + updateFileSystemCounters(statsBefore, actualCallable); + } + } + + private void updateFileSystemCounters(final List<LlapUtil.StatisticsData> statsBefore, + final Callable<V> actualCallable) { + Thread thread = Thread.currentThread(); + TezCounters tezCounters = null; + // add tez counters for task execution and llap io + if (actualCallable instanceof TaskRunner2Callable) { + TaskRunner2Callable taskRunner2Callable = (TaskRunner2Callable) actualCallable; + // counters for task execution side + tezCounters = taskRunner2Callable.addAndGetTezCounter(FileSystemCounter.class.getName()); + } else if (actualCallable instanceof OrcEncodedDataReader) { + // counters for llap io side + tezCounters = ((OrcEncodedDataReader) actualCallable).getTezCounters(); + } + + if (tezCounters != null) { + if (statsBefore != null) { + // if there are multiple stats for the same scheme (from different NameNode), this + // method will squash them together + Map<String, FileSystem.Statistics> schemeToStats = LlapUtil + .getCombinedFileSystemStatistics(); + for (Map.Entry<String, FileSystem.Statistics> entry : schemeToStats.entrySet()) { + final String scheme = entry.getKey(); + FileSystem.Statistics statistics = entry.getValue(); + FileSystem.Statistics.StatisticsData threadFSStats = statistics + .getThreadStatistics(); + List<LlapUtil.StatisticsData> allStatsBefore = LlapUtil + .getStatisticsForScheme(scheme, statsBefore); + long bytesReadDelta = 0; + long bytesWrittenDelta = 0; + long readOpsDelta = 0; + long largeReadOpsDelta = 0; + long writeOpsDelta = 0; + // there could be more scheme after execution as execution might be accessing a + // different filesystem. So if we don't find a matching scheme before execution we + // just use the after execution values directly without computing delta difference + if (allStatsBefore != null && !allStatsBefore.isEmpty()) { + for (LlapUtil.StatisticsData sb : allStatsBefore) { + bytesReadDelta += threadFSStats.getBytesRead() - sb.getBytesRead(); + bytesWrittenDelta += threadFSStats.getBytesWritten() - sb.getBytesWritten(); + readOpsDelta += threadFSStats.getReadOps() - sb.getReadOps(); + largeReadOpsDelta += threadFSStats.getLargeReadOps() - sb.getLargeReadOps(); + writeOpsDelta += threadFSStats.getWriteOps() - sb.getWriteOps(); + } + } else { + bytesReadDelta = threadFSStats.getBytesRead(); + bytesWrittenDelta = threadFSStats.getBytesWritten(); + readOpsDelta = threadFSStats.getReadOps(); + largeReadOpsDelta = threadFSStats.getLargeReadOps(); + writeOpsDelta = threadFSStats.getWriteOps(); + } + tezCounters.findCounter(scheme, FileSystemCounter.BYTES_READ) + .increment(bytesReadDelta); + tezCounters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN) + .increment(bytesWrittenDelta); + tezCounters.findCounter(scheme, FileSystemCounter.READ_OPS).increment(readOpsDelta); + tezCounters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS) + .increment(largeReadOpsDelta); + tezCounters.findCounter(scheme, FileSystemCounter.WRITE_OPS) + .increment(writeOpsDelta); + + if (LOG.isDebugEnabled()) { + LOG.debug("Updated stats: instance: {} thread name: {} thread id: {} scheme: {} " + + "bytesRead: {} bytesWritten: {} readOps: {} largeReadOps: {} writeOps: {}", + actualCallable.getClass().getSimpleName(), thread.getName(), thread.getId(), + scheme, bytesReadDelta, bytesWrittenDelta, readOpsDelta, largeReadOpsDelta, + writeOpsDelta); + } + } + } else { + LOG.warn("File system statistics snapshot before execution of thread is null." + + "Thread name: {} id: {} allStats: {}", thread.getName(), thread.getId(), + statsBefore); + } + } else { + LOG.warn("TezCounters is null for callable type: {}", + actualCallable.getClass().getSimpleName()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 143e755..fb64f0b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -23,7 +23,9 @@ import java.security.PrivilegedExceptionAction; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -39,6 +41,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -46,6 +49,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.JobTokenIdentifier; @@ -100,7 +104,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private final FragmentCompletionHandler fragmentCompletionHanler; private volatile TezTaskRunner2 taskRunner; private volatile TaskReporterInterface taskReporter; - private volatile ListeningExecutorService executor; + private volatile ExecutorService executor; private LlapTaskUmbilicalProtocol umbilical; private volatile long startTime; private volatile String threadName; @@ -181,12 +185,13 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { } // TODO This executor seems unnecessary. Here and TezChild - ExecutorService executorReal = Executors.newFixedThreadPool(1, + executor = new StatsRecordingThreadPool(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("TezTaskRunner") .build()); - executor = MoreExecutors.listeningDecorator(executorReal); // TODO Consolidate this code with TezChild. runtimeWatch.start(); @@ -214,12 +219,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { } }); - TezTaskAttemptID taskAttemptID = taskSpec.getTaskAttemptID(); - TezTaskID taskId = taskAttemptID.getTaskID(); - TezVertexID tezVertexID = taskId.getVertexID(); - TezDAGID tezDAGID = tezVertexID.getDAGId(); - String fragFullId = Joiner.on('_').join(tezDAGID.getId(), tezVertexID.getId(), taskId.getId(), - taskAttemptID.getId()); + String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString()); taskReporter = new LlapTaskReporter( umbilical, confParams.amHeartbeatIntervalMsMax, @@ -227,7 +227,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { confParams.amMaxEventsPerHeartbeat, new AtomicLong(0), request.getContainerIdString(), - fragFullId, + fragmentId, initialEvent); String attemptId = fragmentInfo.getFragmentIdentifierString(); http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index ff9604e..c4ffb9f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -34,14 +34,18 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ExecutorService; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.RowSchema; @@ -87,12 +91,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB private final InputFormat sourceInputFormat; private final AvoidSplitCombination sourceASC; private final ColumnVectorProducer cvp; - private final ListeningExecutorService executor; + private final ExecutorService executor; private final String hostName; @SuppressWarnings("rawtypes") LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp, - ListeningExecutorService executor) { + ExecutorService executor) { // TODO: right now, we do nothing with source input format, ORC-only in the first cut. // We'd need to plumb it thru and use it to get data to cache/etc. assert sourceInputFormat instanceof OrcInputFormat; @@ -173,19 +177,13 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB this.columnIds = includedCols; this.sarg = ConvertAstToSearchArg.createFromConf(job); this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); - String dagId = job.get("tez.mapreduce.dag.index"); - String vertexId = job.get("tez.mapreduce.vertex.index"); - String taskId = job.get("tez.mapreduce.task.index"); - String taskAttemptId = job.get("tez.mapreduce.task.attempt.index"); + String fragmentId = LlapTezUtils.getFragmentId(job); TezCounters taskCounters = null; - if (dagId != null && vertexId != null && taskId != null && taskAttemptId != null) { - String fullId = Joiner.on('_').join(dagId, vertexId, taskId, taskAttemptId); - taskCounters = FragmentCountersMap.getCountersForFragment(fullId); - LOG.info("Received dagid_vertexid_taskid_attempid: {}", fullId); + if (fragmentId != null) { + taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId); + LOG.info("Received fragment id: {}", fragmentId); } else { - LOG.warn("Not using tez counters as some identifier is null." + - " dagId: {} vertexId: {} taskId: {} taskAttempId: {}", - dagId, vertexId, taskId, taskAttemptId); + LOG.warn("Not using tez counters as fragment id string is null"); } this.counters = new QueryFragmentCounters(job, taskCounters); this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName); @@ -255,17 +253,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB return rbCtx; } - private final class UncaughtErrorHandler implements FutureCallback<Void> { + private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { @Override - public void onSuccess(Void result) { - // Successful execution of reader is supposed to call setDone. - } - - @Override - public void onFailure(Throwable t) { - // Reader is not supposed to throw AFTER calling setError. - LlapIoImpl.LOG.error("Unhandled error from reader thread " + t.getMessage()); - setError(t); + public void uncaughtException(final Thread t, final Throwable e) { + LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" + + " Message: {}", t.getName(), t.getId(), e.getMessage()); + setError(e); } } @@ -274,9 +267,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB ReadPipeline rp = cvp.createReadPipeline( this, split, columnIds, sarg, columnNames, counters); feedback = rp; - ListenableFuture<Void> future = executor.submit(rp.getReadCallable()); - // TODO: we should NOT do this thing with handler. Reader needs to do cleanup in most cases. - Futures.addCallback(future, new UncaughtErrorHandler()); + if (executor instanceof StatsRecordingThreadPool) { + // Every thread created by this thread pool will use the same handler + ((StatsRecordingThreadPool) executor) + .setUncaughtExceptionHandler(new IOUncaughtExceptionHandler()); + } + executor.submit(rp.getReadCallable()); } ColumnVectorBatch nextCvb() throws InterruptedException, IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 9316dff..9deef0c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -22,10 +22,15 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.management.ObjectName; +import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -69,7 +74,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator"; private final ColumnVectorProducer cvp; - private final ListeningExecutorService executor; + private final ExecutorService executor; private final LlapDaemonCacheMetrics cacheMetrics; private final LlapDaemonIOMetrics ioMetrics; private ObjectName buddyAllocatorMXBean; @@ -137,8 +142,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { } // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?) int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE); - executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build())); + executor = new StatsRecordingThreadPool(numThreads, numThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()); // TODO: this should depends on input format and be in a map, or something. this.cvp = new OrcColumnVectorProducer( metadataCache, orcCache, bufferManager, conf, cacheMetrics, ioMetrics); http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 1befba7..1dcd2cd 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; import org.apache.orc.impl.DataReaderProperties; import org.apache.orc.impl.OrcIndex; +import org.apache.tez.common.counters.TezCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -921,4 +922,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> return orcDataReader.readStripeFooter(stripe); } } + + public TezCounters getTezCounters() { + return counters.getTezCounters(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java index 2c3e53c..eda8862 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java @@ -14,11 +14,24 @@ package org.apache.hadoop.hive.llap.tezplugins; +import java.text.NumberFormat; + import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.mapreduce.hadoop.MRInputHelpers; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.input.MultiMRInput; +import com.google.common.base.Joiner; + @InterfaceAudience.Private public class LlapTezUtils { public static boolean isSourceOfInterest(String inputClassName) { @@ -26,4 +39,19 @@ public class LlapTezUtils { return !(inputClassName.equals(MRInputLegacy.class.getName()) || inputClassName.equals( MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName())); } + + public static String getFragmentId(final JobConf job) { + String taskAttemptId = job.get(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID); + if (taskAttemptId != null) { + return stripAttemptPrefix(taskAttemptId); + } + return null; + } + + public static String stripAttemptPrefix(final String s) { + if (s.startsWith(TezTaskAttemptID.ATTEMPT)) { + return s.substring(TezTaskAttemptID.ATTEMPT.length() + 1); + } + return s; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 5aab0e5..67cd38d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.ql.Context; @@ -47,6 +48,7 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; @@ -80,6 +82,7 @@ public class TezJobMonitor { private static final String QUERY_EXEC_SUMMARY_HEADER = "Query Execution Summary"; private static final String TASK_SUMMARY_HEADER = "Task Execution Summary"; private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary"; + private static final String FS_COUNTERS_SUMMARY_HEADER = "FileSystem Counters Summary"; // keep this within 80 chars width. If more columns needs to be added then update min terminal // width requirement and SEPARATOR width accordingly @@ -106,6 +109,9 @@ public class TezJobMonitor { "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS", "ALLOCATION", "USED", "TOTAL_IO"); + // FileSystem counters + private static final String FS_COUNTERS_HEADER_FORMAT = "%10s %15s %13s %18s %18s %13s"; + // Methods summary private static final String OPERATION_SUMMARY = "%-35s %9s"; private static final String OPERATION = "OPERATION"; @@ -391,6 +397,10 @@ public class TezJobMonitor { console.printInfo(LLAP_IO_SUMMARY_HEADER); printLlapIOSummary(progressMap, console, dagClient); console.printInfo(SEPARATOR); + console.printInfo(""); + + console.printInfo(FS_COUNTERS_SUMMARY_HEADER); + printFSCountersSummary(progressMap, console, dagClient); } console.printInfo(""); @@ -697,6 +707,62 @@ public class TezJobMonitor { } } + private void printFSCountersSummary(Map<String, Progress> progressMap, LogHelper console, + DAGClient dagClient) { + SortedSet<String> keys = new TreeSet<>(progressMap.keySet()); + Set<StatusGetOpts> statusOptions = new HashSet<>(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + // Assuming FileSystem.getAllStatistics() returns all schemes that are accessed on task side + // as well. If not, we need a way to get all the schemes that are accessed by the tez task/llap. + for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { + final String scheme = statistics.getScheme().toUpperCase(); + final String fsCountersHeader = String.format(FS_COUNTERS_HEADER_FORMAT, + "VERTICES", "BYTES_READ", "READ_OPS", "LARGE_READ_OPS", "BYTES_WRITTEN", "WRITE_OPS"); + + console.printInfo(""); + reprintLineWithColorAsBold("Scheme: " + scheme, Ansi.Color.RED); + console.printInfo(SEPARATOR); + reprintLineWithColorAsBold(fsCountersHeader, Ansi.Color.CYAN); + console.printInfo(SEPARATOR); + + for (String vertexName : keys) { + TezCounters vertexCounters = null; + try { + vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions) + .getVertexCounters(); + } catch (IOException e) { + // best attempt, shouldn't really kill DAG for this + } catch (TezException e) { + // best attempt, shouldn't really kill DAG for this + } + if (vertexCounters != null) { + final String counterGroup = FileSystemCounter.class.getName(); + final long bytesRead = getCounterValueByGroupName(vertexCounters, + counterGroup, scheme + "_" + FileSystemCounter.BYTES_READ.name()); + final long bytesWritten = getCounterValueByGroupName(vertexCounters, + counterGroup, scheme + "_" + FileSystemCounter.BYTES_WRITTEN.name()); + final long readOps = getCounterValueByGroupName(vertexCounters, + counterGroup, scheme + "_" + FileSystemCounter.READ_OPS.name()); + final long largeReadOps = getCounterValueByGroupName(vertexCounters, + counterGroup, scheme + "_" + FileSystemCounter.LARGE_READ_OPS.name()); + final long writeOps = getCounterValueByGroupName(vertexCounters, + counterGroup, scheme + "_" + FileSystemCounter.WRITE_OPS.name()); + + String fsCountersSummary = String.format(FS_COUNTERS_HEADER_FORMAT, + vertexName, + humanReadableByteCount(bytesRead), + readOps, + largeReadOps, + humanReadableByteCount(bytesWritten), + writeOps); + console.printInfo(fsCountersSummary); + } + } + + console.printInfo(SEPARATOR); + } + } + private void printStatusInPlace(Map<String, Progress> progressMap, long startTime, boolean vextexStatusFromAM, DAGClient dagClient) { StringBuilder reportBuffer = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java index 81bda08..412f45c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.hooks; import java.util.List; +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; +import org.apache.tez.common.counters.FileSystemCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -57,12 +59,30 @@ public class PostExecTezSummaryPrinter implements ExecuteWithHookContext { LOG.info("Printing summary for tez task: " + tezTask.getName()); TezCounters counters = tezTask.getTezCounters(); if (counters != null) { + String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); for (CounterGroup group : counters) { - if ("HIVE".equals(group.getDisplayName())) { + if (hiveCountersGroup.equals(group.getDisplayName())) { console.printError(tezTask.getId() + " HIVE COUNTERS:"); for (TezCounter counter : group) { console.printError(" " + counter.getDisplayName() + ": " + counter.getValue()); } + } else if (group.getName().equals(FileSystemCounter.class.getName())) { + console.printError(tezTask.getId() + " FILE SYSTEM COUNTERS:"); + for (TezCounter counter : group) { + // HDFS counters should be relatively consistent across test runs when compared to + // local file system counters + if (counter.getName().contains("HDFS")) { + console.printError(" " + counter.getDisplayName() + ": " + counter.getValue()); + } + } + } else if (group.getName().equals(LlapIOCounters.class.getName())) { + console.printError(tezTask.getId() + " LLAP IO COUNTERS:"); + List<String> testSafeCounters = LlapIOCounters.testSafeCounterNames(); + for (TezCounter counter : group) { + if (testSafeCounters.contains(counter.getDisplayName())) { + console.printError(" " + counter.getDisplayName() + ": " + counter.getValue()); + } + } } } } http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/ql/src/test/queries/clientpositive/orc_llap_counters.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/orc_llap_counters.q b/ql/src/test/queries/clientpositive/orc_llap_counters.q new file mode 100644 index 0000000..1bd55d3 --- /dev/null +++ b/ql/src/test/queries/clientpositive/orc_llap_counters.q @@ -0,0 +1,182 @@ +set hive.mapred.mode=nonstrict; +SET hive.optimize.index.filter=true; +SET hive.cbo.enable=false; +SET hive.vectorized.execution.enabled=true; +SET hive.llap.io.enabled=true; + +CREATE TABLE staging(t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + ts timestamp, + dec decimal(4,2), + bin binary) +ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' +STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../../data/files/over1k' OVERWRITE INTO TABLE staging; +LOAD DATA LOCAL INPATH '../../data/files/over1k' INTO TABLE staging; + +CREATE TABLE orc_ppd_staging(t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + c char(50), + v varchar(50), + da date, + ts timestamp, + dec decimal(4,2), + bin binary) +STORED AS ORC tblproperties("orc.row.index.stride" = "1000", "orc.bloom.filter.columns"="*"); + +insert overwrite table orc_ppd_staging select t, si, i, b, f, d, bo, s, cast(s as char(50)), cast(s as varchar(50)), cast(ts as date), ts, dec, bin from staging order by t, s; + +-- just to introduce a gap in min/max range for bloom filters. The dataset has contiguous values +-- which makes it hard to test bloom filters +insert into orc_ppd_staging select -10,-321,-65680,-4294967430,-97.94,-13.07,true,"aaa","aaa","aaa","1990-03-11","1990-03-11 10:11:58.703308",-71.54,"aaa" from staging limit 1; +insert into orc_ppd_staging select 127,331,65690,4294967440,107.94,23.07,true,"zzz","zzz","zzz","2023-03-11","2023-03-11 10:11:58.703308",71.54,"zzz" from staging limit 1; + +CREATE TABLE orc_ppd(t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + c char(50), + v varchar(50), + da date, + ts timestamp, + dec decimal(4,2), + bin binary) +STORED AS ORC tblproperties("orc.row.index.stride" = "1000", "orc.bloom.filter.columns"="*"); + +insert overwrite table orc_ppd select t, si, i, b, f, d, bo, s, cast(s as char(50)), cast(s as varchar(50)), cast(ts as date), ts, dec, bin from orc_ppd_staging order by t, s; + +describe formatted orc_ppd; + +SET hive.fetch.task.conversion=none; +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecTezSummaryPrinter; + +-- Row group statistics for column t: +-- Entry 0: count: 994 hasNull: true min: -10 max: 54 sum: 26014 positions: 0,0,0,0,0,0,0 +-- Entry 1: count: 1000 hasNull: false min: 54 max: 118 sum: 86812 positions: 0,2,124,0,0,116,11 +-- Entry 2: count: 100 hasNull: false min: 118 max: 127 sum: 12151 positions: 0,4,119,0,0,244,19 + +-- INPUT_RECORDS: 2100 (all row groups) +select count(*) from orc_ppd; + +-- INPUT_RECORDS: 0 (no row groups) +select count(*) from orc_ppd where t > 127; + +-- INPUT_RECORDS: 1000 (1 row group) +select count(*) from orc_ppd where t = 55; +select count(*) from orc_ppd where t <=> 50; +select count(*) from orc_ppd where t <=> 100; + +-- INPUT_RECORDS: 2000 (2 row groups) +select count(*) from orc_ppd where t = "54"; + +-- INPUT_RECORDS: 1000 (1 row group) +select count(*) from orc_ppd where t = -10.0; + +-- INPUT_RECORDS: 1000 (1 row group) +select count(*) from orc_ppd where t = cast(53 as float); +select count(*) from orc_ppd where t = cast(53 as double); + +-- INPUT_RECORDS: 2000 (2 row groups) +select count(*) from orc_ppd where t < 100; + +-- INPUT_RECORDS: 1000 (1 row group) +select count(*) from orc_ppd where t < 100 and t > 98; + +-- INPUT_RECORDS: 2000 (2 row groups) +select count(*) from orc_ppd where t <= 100; + +-- INPUT_RECORDS: 1000 (1 row group) +select count(*) from orc_ppd where t is null; + +-- INPUT_RECORDS: 1100 (2 row groups) +select count(*) from orc_ppd where t in (5, 120); + +-- INPUT_RECORDS: 1000 (1 row group) +select count(*) from orc_ppd where t between 60 and 80; + +-- bloom filter tests +-- INPUT_RECORDS: 0 +select count(*) from orc_ppd where t = -100; +select count(*) from orc_ppd where t <=> -100; +select count(*) from orc_ppd where t = 125; +select count(*) from orc_ppd where t IN (-100, 125, 200); + +-- Row group statistics for column s: +-- Entry 0: count: 1000 hasNull: false min: max: zach young sum: 12907 positions: 0,0,0 +-- Entry 1: count: 1000 hasNull: false min: alice allen max: zach zipper sum: 12704 positions: 0,1611,191 +-- Entry 2: count: 100 hasNull: false min: bob davidson max: zzz sum: 1281 positions: 0,3246,373 + +-- INPUT_RECORDS: 0 (no row groups) +select count(*) from orc_ppd where s > "zzz"; + +-- INPUT_RECORDS: 1000 (1 row group) +select count(*) from orc_ppd where s = "zach young"; +select count(*) from orc_ppd where s <=> "zach zipper"; +select count(*) from orc_ppd where s <=> ""; + +-- INPUT_RECORDS: 0 +select count(*) from orc_ppd where s is null; + +-- INPUT_RECORDS: 2100 +select count(*) from orc_ppd where s is not null; + +-- INPUT_RECORDS: 0 +select count(*) from orc_ppd where s = cast("zach young" as char(50)); + +-- INPUT_RECORDS: 1000 (1 row group) +select count(*) from orc_ppd where s = cast("zach young" as char(10)); +select count(*) from orc_ppd where s = cast("zach young" as varchar(10)); +select count(*) from orc_ppd where s = cast("zach young" as varchar(50)); + +-- INPUT_RECORDS: 2000 (2 row groups) +select count(*) from orc_ppd where s < "b"; + +-- INPUT_RECORDS: 2000 (2 row groups) +select count(*) from orc_ppd where s > "alice" and s < "bob"; + +-- INPUT_RECORDS: 2000 (2 row groups) +select count(*) from orc_ppd where s in ("alice allen", ""); + +-- INPUT_RECORDS: 2000 (2 row groups) +select count(*) from orc_ppd where s between "" and "alice allen"; + +-- INPUT_RECORDS: 100 (1 row group) +select count(*) from orc_ppd where s between "zz" and "zzz"; + +-- INPUT_RECORDS: 1100 (2 row groups) +select count(*) from orc_ppd where s between "zach zipper" and "zzz"; + +-- bloom filter tests +-- INPUT_RECORDS: 0 +select count(*) from orc_ppd where s = "hello world"; +select count(*) from orc_ppd where s <=> "apache hive"; +select count(*) from orc_ppd where s IN ("a", "z"); + +-- INPUT_RECORDS: 100 +select count(*) from orc_ppd where s = "sarah ovid"; + +-- INPUT_RECORDS: 1100 +select count(*) from orc_ppd where s = "wendy king"; + +-- INPUT_RECORDS: 1000 +select count(*) from orc_ppd where s = "wendy king" and t < 0; + +-- INPUT_RECORDS: 100 +select count(*) from orc_ppd where s = "wendy king" and t > 100; http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/ql/src/test/queries/clientpositive/orc_llap_counters1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/orc_llap_counters1.q b/ql/src/test/queries/clientpositive/orc_llap_counters1.q new file mode 100644 index 0000000..06d6c4f --- /dev/null +++ b/ql/src/test/queries/clientpositive/orc_llap_counters1.q @@ -0,0 +1,83 @@ +set hive.mapred.mode=nonstrict; +SET hive.optimize.index.filter=true; +SET hive.cbo.enable=false; +SET hive.vectorized.execution.enabled=true; +SET hive.llap.io.enabled=true; + +CREATE TABLE staging(t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + ts timestamp, + dec decimal(4,2), + bin binary) +ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' +STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../../data/files/over1k' OVERWRITE INTO TABLE staging; +LOAD DATA LOCAL INPATH '../../data/files/over1k' INTO TABLE staging; + +CREATE TABLE orc_ppd_staging(t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + c char(50), + v varchar(50), + da date, + ts timestamp, + dec decimal(4,2), + bin binary) +STORED AS ORC tblproperties("orc.row.index.stride" = "1000", "orc.bloom.filter.columns"="*"); + +insert overwrite table orc_ppd_staging select t, si, i, b, f, d, bo, s, cast(s as char(50)), cast(s as varchar(50)), cast(ts as date), ts, dec, bin from staging order by t, s; + +-- just to introduce a gap in min/max range for bloom filters. The dataset has contiguous values +-- which makes it hard to test bloom filters +insert into orc_ppd_staging select -10,-321,-65680,-4294967430,-97.94,-13.07,true,"aaa","aaa","aaa","1990-03-11","1990-03-11 10:11:58.703308",-71.54,"aaa" from staging limit 1; +insert into orc_ppd_staging select 127,331,65690,4294967440,107.94,23.07,true,"zzz","zzz","zzz","2023-03-11","2023-03-11 10:11:58.703308",71.54,"zzz" from staging limit 1; + +CREATE TABLE orc_ppd(t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + c char(50), + v varchar(50), + da date, + ts timestamp, + dec decimal(4,2), + bin binary) +STORED AS ORC tblproperties("orc.row.index.stride" = "1000", "orc.bloom.filter.columns"="*"); + +insert overwrite table orc_ppd select t, si, i, b, f, d, bo, s, cast(s as char(50)), cast(s as varchar(50)), cast(ts as date), ts, dec, bin from orc_ppd_staging order by t, s; + +describe formatted orc_ppd; + +SET hive.fetch.task.conversion=none; +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecTezSummaryPrinter; + +-- Row group statistics for column t: +-- Entry 0: count: 994 hasNull: true min: -10 max: 54 sum: 26014 positions: 0,0,0,0,0,0,0 +-- Entry 1: count: 1000 hasNull: false min: 54 max: 118 sum: 86812 positions: 0,2,124,0,0,116,11 +-- Entry 2: count: 100 hasNull: false min: 118 max: 127 sum: 12151 positions: 0,4,119,0,0,244,19 + +-- INPUT_RECORDS: 2100 (all row groups) +select count(*) from orc_ppd where t > -100; + +-- 100% LLAP cache hit +select count(*) from orc_ppd where t > -100; + +DROP TABLE staging; +DROP TABLE orc_ppd_staging; +DROP TABLE orc_ppd; http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/ql/src/test/results/clientpositive/llap/orc_llap.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out b/ql/src/test/results/clientpositive/llap/orc_llap.q.out index e62fd92..8ef0c5c 100644 --- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out @@ -81,7 +81,7 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@alltypesorc POSTHOOK: Output: default@cross_numbers POSTHOOK: Lineage: cross_numbers.i EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ] -Warning: Shuffle Join MERGEJOIN[12][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product +Warning: Shuffle Join MERGEJOIN[10][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product PREHOOK: query: insert into table orc_llap select ctinyint + i, csmallint + i, cint + i, cbigint + i, cfloat + i, cdouble + i, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc cross join cross_numbers @@ -121,7 +121,7 @@ POSTHOOK: Output: default@orc_llap_small POSTHOOK: Lineage: orc_llap_small.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] POSTHOOK: Lineage: orc_llap_small.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ] POSTHOOK: Lineage: orc_llap_small.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ] -Warning: Map Join MAPJOIN[16][bigTable=?] in task 'Map 1' is a cross product +Warning: Map Join MAPJOIN[14][bigTable=?] in task 'Map 1' is a cross product PREHOOK: query: -- Cross join with no projection - do it on small table explain select count(1) from orc_llap_small y join orc_llap_small x @@ -137,6 +137,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-1 Tez +#### A masked pattern was here #### Edges: Map 1 <- Map 3 (BROADCAST_EDGE) Reducer 2 <- Map 1 (SIMPLE_EDGE) @@ -157,8 +158,7 @@ STAGE PLANS: 1 input vertices: 1 Map 3 - Statistics: Num rows: 112 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE - HybridGraceHashJoin: true + Statistics: Num rows: 225 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE Group By Operator aggregations: count(1) mode: hash @@ -183,7 +183,7 @@ STAGE PLANS: Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 - Execution mode: vectorized, uber + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) @@ -194,8 +194,8 @@ STAGE PLANS: compressed: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 @@ -204,7 +204,7 @@ STAGE PLANS: Processor Tree: ListSink -Warning: Map Join MAPJOIN[16][bigTable=?] in task 'Map 1' is a cross product +Warning: Map Join MAPJOIN[14][bigTable=?] in task 'Map 1' is a cross product PREHOOK: query: select count(1) from orc_llap_small y join orc_llap_small x PREHOOK: type: QUERY PREHOOK: Input: default@orc_llap_small @@ -278,6 +278,9 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@orc_llap POSTHOOK: Output: database:default POSTHOOK: Output: default@llap_temp_table +POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ] PREHOOK: query: select sum(hash(*)) from llap_temp_table PREHOOK: type: QUERY PREHOOK: Input: default@llap_temp_table @@ -331,6 +334,18 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@orc_llap POSTHOOK: Output: database:default POSTHOOK: Output: default@llap_temp_table +POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cboolean1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean1, type:boolean, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cboolean2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean2, type:boolean, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cdouble SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cfloat SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.ctimestamp1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.ctimestamp2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.ctinyint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctinyint, type:tinyint, comment:null), ] PREHOOK: query: select sum(hash(*)) from llap_temp_table PREHOOK: type: QUERY PREHOOK: Input: default@llap_temp_table @@ -384,6 +399,7 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@orc_llap POSTHOOK: Output: database:default POSTHOOK: Output: default@llap_temp_table +POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ] PREHOOK: query: select sum(hash(*)) from llap_temp_table PREHOOK: type: QUERY PREHOOK: Input: default@llap_temp_table @@ -414,6 +430,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-1 Tez +#### A masked pattern was here #### Edges: Reducer 2 <- Map 1 (SIMPLE_EDGE) #### A masked pattern was here #### @@ -442,7 +459,7 @@ STAGE PLANS: Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 - Execution mode: vectorized, uber + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) @@ -454,8 +471,8 @@ STAGE PLANS: compressed: false Statistics: Num rows: 61440 Data size: 14539970 Basic stats: COMPLETE Column stats: NONE table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 @@ -476,6 +493,9 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@orc_llap POSTHOOK: Output: database:default POSTHOOK: Output: default@llap_temp_table +POSTHOOK: Lineage: llap_temp_table.c2 EXPRESSION [(orc_llap)orc_llap.null, ] +POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ] PREHOOK: query: select sum(hash(*)) from llap_temp_table PREHOOK: type: QUERY PREHOOK: Input: default@llap_temp_table @@ -506,8 +526,9 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-1 Tez +#### A masked pattern was here #### Edges: - Map 1 <- Map 2 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -518,48 +539,62 @@ STAGE PLANS: Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (csmallint is not null and cbigint is not null) (type: boolean) - Statistics: Num rows: 30720 Data size: 7269985 Basic stats: COMPLETE Column stats: NONE - Map Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 csmallint (type: smallint) - 1 csmallint (type: smallint) - outputColumnNames: _col6, _col22 - input vertices: - 1 Map 2 - Statistics: Num rows: 33792 Data size: 7996983 Basic stats: COMPLETE Column stats: NONE - HybridGraceHashJoin: true - Select Operator - expressions: _col6 (type: string), _col22 (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 33792 Data size: 7996983 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 33792 Data size: 7996983 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: csmallint (type: smallint), cstring1 (type: string) + outputColumnNames: _col0, _col2 + Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: smallint) + sort order: + + Map-reduce partition columns: _col0 (type: smallint) + Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: string) Execution mode: vectorized, llap LLAP IO: all inputs - Map 2 + Map 3 Map Operator Tree: TableScan - alias: o2 + alias: o1 filterExpr: (csmallint is not null and cbigint is not null) (type: boolean) Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (csmallint is not null and cbigint is not null) (type: boolean) - Statistics: Num rows: 30720 Data size: 7269985 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: csmallint (type: smallint) - sort order: + - Map-reduce partition columns: csmallint (type: smallint) - Statistics: Num rows: 30720 Data size: 7269985 Basic stats: COMPLETE Column stats: NONE - value expressions: cstring2 (type: string) + Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: csmallint (type: smallint), cstring2 (type: string) + outputColumnNames: _col0, _col2 + Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: smallint) + sort order: + + Map-reduce partition columns: _col0 (type: smallint) + Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: string) Execution mode: vectorized, llap LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: smallint) + 1 _col0 (type: smallint) + outputColumnNames: _col2, _col5 + Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col2 (type: string), _col5 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator @@ -579,6 +614,8 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@orc_llap POSTHOOK: Output: database:default POSTHOOK: Output: default@llap_temp_table +POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring2, type:string, comment:null), ] PREHOOK: query: select sum(hash(*)) from llap_temp_table PREHOOK: type: QUERY PREHOOK: Input: default@llap_temp_table @@ -588,7 +625,7 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@llap_temp_table #### A masked pattern was here #### -735462183586256 -Warning: Map Join MAPJOIN[12][bigTable=?] in task 'Map 1' is a cross product +Warning: Map Join MAPJOIN[10][bigTable=?] in task 'Map 1' is a cross product PREHOOK: query: -- multi-stripe test insert into table orc_llap select ctinyint + i, csmallint + i, cint + i, cbigint + i, cfloat + i, cdouble + i, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 @@ -669,6 +706,9 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@orc_llap POSTHOOK: Output: database:default POSTHOOK: Output: default@llap_temp_table +POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ] PREHOOK: query: select sum(hash(*)) from llap_temp_table PREHOOK: type: QUERY PREHOOK: Input: default@llap_temp_table @@ -722,6 +762,18 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@orc_llap POSTHOOK: Output: database:default POSTHOOK: Output: default@llap_temp_table +POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cboolean1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean1, type:boolean, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cboolean2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean2, type:boolean, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cdouble SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cfloat SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.ctimestamp1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.ctimestamp2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.ctinyint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctinyint, type:tinyint, comment:null), ] PREHOOK: query: select sum(hash(*)) from llap_temp_table PREHOOK: type: QUERY PREHOOK: Input: default@llap_temp_table @@ -775,6 +827,7 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@orc_llap POSTHOOK: Output: database:default POSTHOOK: Output: default@llap_temp_table +POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ] PREHOOK: query: select sum(hash(*)) from llap_temp_table PREHOOK: type: QUERY PREHOOK: Input: default@llap_temp_table @@ -805,6 +858,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-1 Tez +#### A masked pattern was here #### Edges: Reducer 2 <- Map 1 (SIMPLE_EDGE) #### A masked pattern was here #### @@ -813,40 +867,40 @@ STAGE PLANS: Map Operator Tree: TableScan alias: orc_llap - Statistics: Num rows: 8014 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: cstring1 (type: string), cstring2 (type: string) outputColumnNames: cstring1, cstring2 - Statistics: Num rows: 8014 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() keys: cstring1 (type: string), cstring2 (type: string) mode: hash outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8014 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: string), _col1 (type: string) sort order: ++ Map-reduce partition columns: _col0 (type: string), _col1 (type: string) - Statistics: Num rows: 8014 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: bigint) Execution mode: vectorized, llap LLAP IO: all inputs Reducer 2 - Execution mode: vectorized, uber + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) keys: KEY._col0 (type: string), KEY._col1 (type: string) mode: mergepartial outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 4007 Data size: 801469 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 4007 Data size: 801469 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 @@ -867,6 +921,9 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@orc_llap POSTHOOK: Output: database:default POSTHOOK: Output: default@llap_temp_table +POSTHOOK: Lineage: llap_temp_table.c2 EXPRESSION [(orc_llap)orc_llap.null, ] +POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ] PREHOOK: query: select sum(hash(*)) from llap_temp_table PREHOOK: type: QUERY PREHOOK: Input: default@llap_temp_table @@ -897,8 +954,9 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-1 Tez +#### A masked pattern was here #### Edges: - Map 1 <- Map 2 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -906,51 +964,65 @@ STAGE PLANS: TableScan alias: o1 filterExpr: (csmallint is not null and cbigint is not null) (type: boolean) - Statistics: Num rows: 14311 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (csmallint is not null and cbigint is not null) (type: boolean) - Statistics: Num rows: 3578 Data size: 400762 Basic stats: COMPLETE Column stats: NONE - Map Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 csmallint (type: smallint) - 1 csmallint (type: smallint) - outputColumnNames: _col6, _col22 - input vertices: - 1 Map 2 - Statistics: Num rows: 3935 Data size: 440838 Basic stats: COMPLETE Column stats: NONE - HybridGraceHashJoin: true - Select Operator - expressions: _col6 (type: string), _col22 (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 3935 Data size: 440838 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 3935 Data size: 440838 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: csmallint (type: smallint), cstring1 (type: string) + outputColumnNames: _col0, _col2 + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: smallint) + sort order: + + Map-reduce partition columns: _col0 (type: smallint) + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: string) Execution mode: vectorized, llap LLAP IO: all inputs - Map 2 + Map 3 Map Operator Tree: TableScan - alias: o2 + alias: o1 filterExpr: (csmallint is not null and cbigint is not null) (type: boolean) - Statistics: Num rows: 14311 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (csmallint is not null and cbigint is not null) (type: boolean) - Statistics: Num rows: 3578 Data size: 400762 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: csmallint (type: smallint) - sort order: + - Map-reduce partition columns: csmallint (type: smallint) - Statistics: Num rows: 3578 Data size: 400762 Basic stats: COMPLETE Column stats: NONE - value expressions: cstring2 (type: string) + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: csmallint (type: smallint), cstring2 (type: string) + outputColumnNames: _col0, _col2 + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: smallint) + sort order: + + Map-reduce partition columns: _col0 (type: smallint) + Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: string) Execution mode: vectorized, llap LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: smallint) + 1 _col0 (type: smallint) + outputColumnNames: _col2, _col5 + Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col2 (type: string), _col5 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator @@ -970,6 +1042,8 @@ POSTHOOK: type: CREATETABLE_AS_SELECT POSTHOOK: Input: default@orc_llap POSTHOOK: Output: database:default POSTHOOK: Output: default@llap_temp_table +POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring2, type:string, comment:null), ] PREHOOK: query: select sum(hash(*)) from llap_temp_table PREHOOK: type: QUERY PREHOOK: Input: default@llap_temp_table