Repository: hive Updated Branches: refs/heads/master 0d4d03fd1 -> 72349bb33
HIVE-20772 : record per-task CPU counters in LLAP (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72349bb3 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72349bb3 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72349bb3 Branch: refs/heads/master Commit: 72349bb33988656ad43afdc0c7556532ee0dadbc Parents: 0d4d03f Author: sergey <[email protected]> Authored: Tue Oct 23 15:16:17 2018 -0700 Committer: sergey <[email protected]> Committed: Tue Oct 23 15:16:17 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/llap/LlapUtil.java | 17 ++++++++ .../hive/llap/counters/LlapIOCounters.java | 4 +- .../hive/llap/cache/LowLevelCacheCounters.java | 3 +- .../llap/counters/QueryFragmentCounters.java | 10 ++++- .../daemon/impl/StatsRecordingThreadPool.java | 42 +++++++++++++++---- .../hive/llap/io/api/impl/LlapRecordReader.java | 2 +- .../llap/io/decode/EncodedDataConsumer.java | 43 +++++++++++++++++++- .../llap/io/decode/OrcEncodedDataConsumer.java | 6 +-- .../llap/io/encoded/OrcEncodedDataReader.java | 10 ++--- .../llap/io/encoded/SerDeEncodedDataReader.java | 2 +- 10 files changed, 115 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index 50c0e22..82776ab 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -14,6 +14,8 @@ package org.apache.hadoop.hive.llap; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -366,4 +368,19 @@ public class LlapUtil { return p.startsWith(BASE_PREFIX) || p.startsWith(DELTA_PREFIX) || p.startsWith(BUCKET_PREFIX) || p.startsWith(UNION_SUDBIR_PREFIX) || p.startsWith(DELETE_DELTA_PREFIX); } + + + public static ThreadMXBean initThreadMxBean() { + ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); + if (mxBean != null) { + if (!mxBean.isCurrentThreadCpuTimeSupported()) { + LOG.warn("Thread CPU monitoring is not supported"); + return null; + } else if (!mxBean.isThreadCpuTimeEnabled()) { + LOG.warn("Thread CPU monitoring is not enabled"); + return null; + } + } + return mxBean; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java index 059d5b9..d27193f 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java @@ -36,7 +36,9 @@ public enum LlapIOCounters { TOTAL_IO_TIME_NS(false), DECODE_TIME_NS(false), HDFS_TIME_NS(false), - CONSUMER_TIME_NS(false); + CONSUMER_TIME_NS(false), + IO_CPU_NS(false), + IO_USER_NS(false); // flag to indicate if these counters are subject to change across different test runs private boolean testSafe; http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java index 91df036..c2aca5a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java @@ -21,6 +21,7 @@ public interface LowLevelCacheCounters { void recordCacheHit(long bytesHit); void recordCacheMiss(long bytesMissed); void recordAllocBytes(long bytesWasted, long bytesAllocated); - void recordHdfsTime(long timeUs); + void recordHdfsTime(long timeNs); + void recordThreadTimes(long cpuNs, long userNs); long startTimeCounter(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java index be4dfad..f5f2982 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java @@ -71,7 +71,7 @@ public class QueryFragmentCounters implements LowLevelCacheCounters { return (doUseTimeCounters ? System.nanoTime() : 0); } - public void incrTimeCounter(LlapIOCounters counter, long startTime) { + public void incrWallClockCounter(LlapIOCounters counter, long startTime) { if (!doUseTimeCounters) return; long delta = System.nanoTime() - startTime; fixedCounters.addAndGet(counter.ordinal(), delta); @@ -109,7 +109,13 @@ public class QueryFragmentCounters implements LowLevelCacheCounters { @Override public void recordHdfsTime(long startTime) { - incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + } + + @Override + public void recordThreadTimes(long cpuNs, long userNs) { + incrCounter(LlapIOCounters.IO_CPU_NS, cpuNs); + incrCounter(LlapIOCounters.IO_USER_NS, userNs); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java index 27462e1..873bdc7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Stack; @@ -31,8 +34,8 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource; -import org.apache.log4j.NDC; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounters; @@ -50,6 +53,7 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor { private static final Logger LOG = LoggerFactory.getLogger(StatsRecordingThreadPool.class); // uncaught exception handler that will be set for all threads before execution private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; + private final ThreadMXBean mxBean; public StatsRecordingThreadPool(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, @@ -66,11 +70,12 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor { final ThreadFactory threadFactory, Thread.UncaughtExceptionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.uncaughtExceptionHandler = handler; + this.mxBean = LlapUtil.initThreadMxBean(); } @Override protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) { - return new FutureTask(new WrappedCallable(callable, uncaughtExceptionHandler)); + return new FutureTask(new WrappedCallable(callable, uncaughtExceptionHandler, mxBean)); } public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) { @@ -86,11 +91,13 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor { private static class WrappedCallable<V> implements Callable<V> { private Callable<V> actualCallable; private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; + private ThreadMXBean mxBean; WrappedCallable(final Callable<V> callable, - final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + final Thread.UncaughtExceptionHandler uncaughtExceptionHandler, ThreadMXBean mxBean) { this.actualCallable = callable; this.uncaughtExceptionHandler = uncaughtExceptionHandler; + this.mxBean = mxBean; } @Override @@ -104,12 +111,18 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor { // clone thread local file system statistics List<LlapUtil.StatisticsData> statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics(); - + long cpuTime = mxBean == null ? -1 : mxBean.getCurrentThreadCpuTime(), + userTime = mxBean == null ? -1 : mxBean.getCurrentThreadUserTime(); setupMDCFromNDC(actualCallable); try { return actualCallable.call(); } finally { - updateFileSystemCounters(statsBefore, actualCallable); + if (mxBean != null) { + cpuTime = mxBean.getCurrentThreadCpuTime() - cpuTime; + userTime = mxBean.getCurrentThreadUserTime() - userTime; + } + updateCounters(statsBefore, actualCallable, cpuTime, userTime); + MDC.clear(); } } @@ -148,8 +161,17 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor { } } - private void updateFileSystemCounters(final List<LlapUtil.StatisticsData> statsBefore, - final Callable<V> actualCallable) { + /** + * LLAP IO related counters. + */ + public enum LlapExecutorCounters { + EXECUTOR_CPU_NS, + EXECUTOR_USER_NS; + + } + + private void updateCounters(final List<LlapUtil.StatisticsData> statsBefore, + final Callable<V> actualCallable, long cpuTime, long userTime) { Thread thread = Thread.currentThread(); TezCounters tezCounters = null; // add tez counters for task execution and llap io @@ -160,9 +182,15 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor { } else if (actualCallable instanceof TezCounterSource) { // Other counter sources (currently used in LLAP IO). tezCounters = ((TezCounterSource) actualCallable).getTezCounters(); + } else { + LOG.warn("Unexpected callable {}; cannot get counters", actualCallable); } if (tezCounters != null) { + if (cpuTime >= 0 && userTime >= 0) { + tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime); + tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime); + } if (statsBefore != null) { // if there are multiple stats for the same scheme (from different NameNode), this // method will squash them together http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 27a5b0f..9ef7af4 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -338,7 +338,7 @@ class LlapRecordReader if (wasFirst) { firstReturnTime = counters.startTimeCounter(); } - counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime); + counters.incrWallClockCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime); return false; } if (isAcidScan) { http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java index d5c2d48..f2d2832 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java @@ -17,19 +17,26 @@ */ package org.apache.hadoop.hive.llap.io.decode; +import java.lang.management.ThreadMXBean; import java.util.concurrent.Callable; import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace; import org.apache.hive.common.util.FixedSizedObjectPool; import org.apache.orc.TypeDescription; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.task.TaskRunner2Callable; public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>> implements Consumer<BatchType>, ReadPipeline { @@ -41,11 +48,14 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol // Note that the pool is per EDC - within EDC, CVBs are expected to have the same schema. private static final int CVB_POOL_SIZE = 128; protected final FixedSizedObjectPool<ColumnVectorBatch> cvbPool; + protected final QueryFragmentCounters counters; + private final ThreadMXBean mxBean; public EncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, final int colCount, - LlapDaemonIOMetrics ioMetrics) { + LlapDaemonIOMetrics ioMetrics, QueryFragmentCounters counters) { this.downstreamConsumer = consumer; this.ioMetrics = ioMetrics; + this.mxBean = LlapUtil.initThreadMxBean(); cvbPool = new FixedSizedObjectPool<ColumnVectorBatch>(CVB_POOL_SIZE, new Pool.PoolObjectHelper<ColumnVectorBatch>() { @Override @@ -57,12 +67,41 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol // Don't reset anything, we are reusing column vectors. } }); + this.counters = counters; + } + + // Implementing TCS is needed for StatsRecordingThreadPool. + private class CpuRecordingCallable implements Callable<Void>, TezCounterSource { + private final Callable<Void> readCallable; + + public CpuRecordingCallable(Callable<Void> readCallable) { + this.readCallable = readCallable; + } + + @Override + public Void call() throws Exception { + long cpuTime = mxBean.getCurrentThreadCpuTime(), + userTime = mxBean.getCurrentThreadUserTime(); + try { + return readCallable.call(); + } finally { + counters.recordThreadTimes(mxBean.getCurrentThreadCpuTime() - cpuTime, + mxBean.getCurrentThreadUserTime() - userTime); + } + } + + @Override + public TezCounters getTezCounters() { + return (readCallable instanceof TezCounterSource) + ? ((TezCounterSource) readCallable).getTezCounters() : null; + } + } public void init(ConsumerFeedback<BatchType> upstreamFeedback, Callable<Void> readCallable) { this.upstreamFeedback = upstreamFeedback; - this.readCallable = readCallable; + this.readCallable = mxBean == null ? readCallable : new CpuRecordingCallable(readCallable); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 40248a3..83931c2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -69,7 +69,6 @@ public class OrcEncodedDataConsumer private CompressionCodec codec; private List<ConsumerStripeMetadata> stripes; private final boolean skipCorrupt; // TODO: get rid of this - private final QueryFragmentCounters counters; private SchemaEvolution evolution; private IoTrace trace; private final Includes includes; @@ -79,11 +78,10 @@ public class OrcEncodedDataConsumer public OrcEncodedDataConsumer( Consumer<ColumnVectorBatch> consumer, Includes includes, boolean skipCorrupt, QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) { - super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics); + super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics, counters); this.includes = includes; // TODO: get rid of this this.skipCorrupt = skipCorrupt; - this.counters = counters; } public void setUseDecimal64ColumnVectors(final boolean useDecimal64ColumnVectors) { @@ -209,7 +207,7 @@ public class OrcEncodedDataConsumer counters.incrCounter(LlapIOCounters.ROWS_EMITTED, batchSize); } LlapIoImpl.ORC_LOGGER.debug("Done with decode"); - counters.incrTimeCounter(LlapIOCounters.DECODE_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.DECODE_TIME_NS, startTime); counters.incrCounter(LlapIOCounters.NUM_VECTOR_BATCHES, maxBatchesRG); counters.incrCounter(LlapIOCounters.NUM_DECODED_BATCHES); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 4f5b0a9..74cee64 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -443,7 +443,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } private void recordReaderTime(long startTime) { - counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime); } private void validateFileMetadata() throws IOException { @@ -519,7 +519,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } } orcReader = EncodedOrcFile.createReader(path, opts); - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime); } /** @@ -677,7 +677,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> DiskRangeList footerRange = rawDataReader.readFileData( new DiskRangeList(offset, offset + si.getFooterLength()), 0, false); // LOG.error("Got " + RecordReaderUtils.stringifyDiskRanges(footerRange)); - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime); assert footerRange.next == null; // Can only happens w/zcr for a single input buffer. if (hasCache) { LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail( @@ -716,7 +716,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> if (!isRawDataReaderOpen && isOpen) { long startTime = counters.startTimeCounter(); rawDataReader.open(); - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime); } return; } @@ -734,7 +734,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> rawDataReader.open(); isRawDataReaderOpen = true; } - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 658bc7d..a5671e9 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -1688,7 +1688,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } private void recordReaderTime(long startTime) { - counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime); + counters.incrWallClockCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime); } private boolean processStop() {
