HIVE-16078 : improve abort checking in Tez/LLAP (Sergey Shelukhin, reviewed by Rajesh Balamohan, Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0cc1afa5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0cc1afa5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0cc1afa5 Branch: refs/heads/hive-14535 Commit: 0cc1afa56afc882c4b7a756112f749acebc3c58a Parents: b478a22 Author: Sergey Shelukhin <[email protected]> Authored: Wed Mar 8 12:22:17 2017 -0800 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Mar 8 12:22:17 2017 -0800 ---------------------------------------------------------------------- .../hive/ql/exec/CommonMergeJoinOperator.java | 11 +++ .../ql/exec/tez/InterruptibleProcessing.java | 79 ++++++++++++++++++++ .../hive/ql/exec/tez/MapRecordProcessor.java | 18 ++--- .../ql/exec/tez/MergeFileRecordProcessor.java | 16 ++-- .../hive/ql/exec/tez/RecordProcessor.java | 18 +---- .../hive/ql/exec/tez/ReduceRecordProcessor.java | 27 +++---- .../VectorMapJoinGenerateResultOperator.java | 29 +++++++ 7 files changed, 147 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java index 0b8eae8..8495d73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.exec.tez.InterruptibleProcessing; import org.apache.hadoop.hive.ql.exec.tez.RecordSource; import org.apache.hadoop.hive.ql.exec.tez.TezContext; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -86,6 +87,9 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge new ArrayList<Operator<? extends OperatorDesc>>(); transient Set<Integer> fetchInputAtClose; + // A field because we cannot multi-inherit. + transient InterruptibleProcessing interruptChecker; + /** Kryo ctor. */ protected CommonMergeJoinOperator() { super(); @@ -156,6 +160,7 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge } sources = ((TezContext) MapredContext.get()).getRecordSources(); + interruptChecker = new InterruptibleProcessing(); } /* @@ -374,11 +379,17 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge // for tables other than the big table, we need to fetch more data until reach a new group or // done. + interruptChecker.startAbortChecks(); // Reset the time, we only want to count it in the loop. while (!foundNextKeyGroup[t]) { if (fetchDone[t]) { break; } fetchOneRow(t); + try { + interruptChecker.addRowAndMaybeCheckAbort(); + } catch (InterruptedException e) { + throw new HiveException(e); + } } if (!foundNextKeyGroup[t] && fetchDone[t]) { this.nextKeyWritables[t] = null; http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InterruptibleProcessing.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InterruptibleProcessing.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InterruptibleProcessing.java new file mode 100644 index 0000000..260886b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InterruptibleProcessing.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +public class InterruptibleProcessing { + private final static Logger LOG = LoggerFactory.getLogger(InterruptibleProcessing.class); + private static final int CHECK_INTERRUPTION_AFTER_ROWS_DEFAULT = 1000, + CHECK_INTERRUPTION_AFTER_ROWS_MAX = 100000, CHECK_INTERRUPTION_AFTER_ROWS_MIN = 1; + private static final double TARGET_INTERRUPT_CHECK_TIME_NS = 3 * 1000000000.0; + + private int checkInterruptionAfterRows = CHECK_INTERRUPTION_AFTER_ROWS_DEFAULT; + private long lastInterruptCheckNs = 0L; + private int nRows; + private volatile boolean isAborted; + + // Methods should really be protected, but some places have to use this as a field. + + public final void startAbortChecks() { + lastInterruptCheckNs = System.nanoTime(); + nRows = 0; + } + + public final void addRowAndMaybeCheckAbort() throws InterruptedException { + if (nRows++ < checkInterruptionAfterRows) return; + long time = System.nanoTime(); + checkAbortCondition(); + long elapsedNs = (time - lastInterruptCheckNs); + if (elapsedNs >= 0) { + // Make sure we don't get stuck at 0 time, however unlikely that is. + double diff = elapsedNs == 0 ? 10 : TARGET_INTERRUPT_CHECK_TIME_NS / elapsedNs; + int newRows = Math.min(CHECK_INTERRUPTION_AFTER_ROWS_MAX, + Math.max(CHECK_INTERRUPTION_AFTER_ROWS_MIN, (int) (diff * checkInterruptionAfterRows))); + if (checkInterruptionAfterRows != newRows && LOG.isDebugEnabled()) { + LOG.debug("Adjusting abort check rows to " + newRows + + " from " + checkInterruptionAfterRows); + } + checkInterruptionAfterRows = newRows; + } + nRows = 0; + lastInterruptCheckNs = time; + } + + public final void checkAbortCondition() throws InterruptedException { + boolean isInterrupted = Thread.currentThread().isInterrupted(); + if (!isAborted && !isInterrupted) return; + // Not cleaning the interrupt status. + throw new InterruptedException("Processing thread aborted. Interrupt state: " + isInterrupted); + } + + public final void setAborted(boolean b) { + this.isAborted = b; + } + + public void abort() { + setAborted(true); + } + + public final boolean isAborted() { + return this.isAborted; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 790c9d8..24d3526 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -92,7 +92,6 @@ public class MapRecordProcessor extends RecordProcessor { List<MapWork> mergeWorkList; List<String> cacheKeys, dynamicValueCacheKeys; ObjectCache cache, dynamicValueCache; - private int nRows; public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); @@ -106,7 +105,6 @@ public class MapRecordProcessor extends RecordProcessor { execContext.setJc(jconf); cacheKeys = new ArrayList<String>(); dynamicValueCacheKeys = new ArrayList<String>(); - nRows = 0; } private void setLlapOfFragmentId(final ProcessorContext context) { @@ -343,7 +341,7 @@ public class MapRecordProcessor extends RecordProcessor { MapredContext.get().setReporter(reporter); } catch (Throwable e) { - abort = true; + setAborted(true); if (e instanceof OutOfMemoryError) { // will this be true here? // Don't create a new object if we are already out of memory @@ -417,15 +415,12 @@ public class MapRecordProcessor extends RecordProcessor { @Override void run() throws Exception { + startAbortChecks(); while (sources[position].pushRecord()) { - if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) { - checkAbortCondition(); - nRows = 0; - } + addRowAndMaybeCheckAbort(); } } - @Override public void abort() { // this will stop run() from pushing records, along with potentially @@ -444,8 +439,8 @@ public class MapRecordProcessor extends RecordProcessor { @Override void close(){ // check if there are IOExceptions - if (!abort) { - abort = execContext.getIoCxt().getIOExceptions(); + if (!isAborted()) { + setAborted(execContext.getIoCxt().getIOExceptions()); } if (cache != null && cacheKeys != null) { @@ -465,6 +460,7 @@ public class MapRecordProcessor extends RecordProcessor { if (mapOp == null || mapWork == null) { return; } + boolean abort = isAborted(); mapOp.close(abort); if (mergeMapOpList.isEmpty() == false) { for (AbstractMapOperator mergeMapOp : mergeMapOpList) { @@ -486,7 +482,7 @@ public class MapRecordProcessor extends RecordProcessor { mapOp.preorderMap(rps); return; } catch (Exception e) { - if (!abort) { + if (!isAborted()) { // signal new failure to map-reduce l4j.error("Hit error while closing operators - failing tree"); throw new RuntimeException("Hive Runtime Error while closing operators", e); http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index b7f1011..69de3a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -153,18 +153,13 @@ public class MergeFileRecordProcessor extends RecordProcessor { while (reader.next()) { boolean needMore = processRow(reader.getCurrentKey(), reader.getCurrentValue()); - if (!needMore || abort) { + if (!needMore || isAborted()) { break; } } } @Override - void abort() { - super.abort(); - } - - @Override void close() { if (cache != null && cacheKey != null) { @@ -172,8 +167,8 @@ public class MergeFileRecordProcessor extends RecordProcessor { } // check if there are IOExceptions - if (!abort) { - abort = execContext.getIoCxt().getIOExceptions(); + if (!isAborted()) { + setAborted(execContext.getIoCxt().getIOExceptions()); } // detecting failed executions by exceptions thrown by the operator tree @@ -181,12 +176,13 @@ public class MergeFileRecordProcessor extends RecordProcessor { if (mergeOp == null || mfWork == null) { return; } + boolean abort = isAborted(); mergeOp.close(abort); ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter, jconf); mergeOp.preorderMap(rps); } catch (Exception e) { - if (!abort) { + if (!isAborted()) { // signal new failure to map-reduce l4j.error("Hit error while closing operators - failing tree"); throw new RuntimeException("Hive Runtime Error while closing operators", @@ -216,7 +212,7 @@ public class MergeFileRecordProcessor extends RecordProcessor { mergeOp.process(row, 0); } } catch (Throwable e) { - abort = true; + setAborted(true); if (e instanceof OutOfMemoryError) { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index 77c7fa3..106a534 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -47,9 +47,7 @@ import com.google.common.collect.Maps; * Process input from tez LogicalInput and write output * It has different subclasses for map and reduce processing */ -public abstract class RecordProcessor { - protected static final int CHECK_INTERRUPTION_AFTER_ROWS = 1000; - +public abstract class RecordProcessor extends InterruptibleProcessing { protected final JobConf jconf; protected Map<String, LogicalInput> inputs; protected Map<String, LogicalOutput> outputs; @@ -58,8 +56,6 @@ public abstract class RecordProcessor { public static final Logger l4j = LoggerFactory.getLogger(RecordProcessor.class); - protected volatile boolean abort = false; - // used to log memory usage periodically protected boolean isLogInfoEnabled = false; protected boolean isLogTraceEnabled = false; @@ -149,16 +145,4 @@ public abstract class RecordProcessor { return null; } } - - void abort() { - this.abort = true; - } - - protected void checkAbortCondition() throws InterruptedException { - if (abort || Thread.currentThread().isInterrupted()) { - // Not cleaning the interrupt status. - boolean interruptState = Thread.currentThread().isInterrupted(); - throw new InterruptedException("Processing thread aborted. Interrupt state: " + interruptState); - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 3fb9fb1..f6f2dd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -62,7 +62,7 @@ import com.google.common.collect.Lists; * Process input from tez LogicalInput and write output - for a map plan * Just pump the records through the query plan. */ -public class ReduceRecordProcessor extends RecordProcessor{ +public class ReduceRecordProcessor extends RecordProcessor { private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; @@ -85,9 +85,6 @@ public class ReduceRecordProcessor extends RecordProcessor{ private byte bigTablePosition = 0; - - private int nRows = 0; - public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); @@ -256,7 +253,7 @@ public class ReduceRecordProcessor extends RecordProcessor{ MapredContext.get().setReporter(reporter); } catch (Throwable e) { - abort = true; + super.setAborted(true); if (e instanceof OutOfMemoryError) { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; @@ -309,18 +306,16 @@ public class ReduceRecordProcessor extends RecordProcessor{ for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) { l4j.info("Starting Output: " + outputEntry.getKey()); - if (!abort) { + if (!isAborted()) { outputEntry.getValue().start(); ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } } // run the operator pipeline + startAbortChecks(); while (sources[bigTablePosition].pushRecord()) { - if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) { - checkAbortCondition(); - nRows = 0; - } + addRowAndMaybeCheckAbort(); } } @@ -374,10 +369,16 @@ public class ReduceRecordProcessor extends RecordProcessor{ } try { - for (ReduceRecordSource rs: sources) { - abort = abort && rs.close(); + if (isAborted()) { + for (ReduceRecordSource rs: sources) { + if (!rs.close()) { + setAborted(false); // Preserving the old logic. Hmm... + break; + } + } } + boolean abort = isAborted(); reducer.close(abort); if (mergeWorkList != null) { for (BaseWork redWork : mergeWorkList) { @@ -398,7 +399,7 @@ public class ReduceRecordProcessor extends RecordProcessor{ reducer.preorderMap(rps); } catch (Exception e) { - if (!abort) { + if (!isAborted()) { // signal new failure to map-reduce l4j.error("Hit error while closing operators - failing tree"); throw new RuntimeException( http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 3e8d3e8..cb30413 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -25,7 +25,9 @@ import java.util.List; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; @@ -76,6 +78,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinGenerateResultOperator.class.getName()); private static final String CLASS_NAME = VectorMapJoinGenerateResultOperator.class.getName(); + private static final int CHECK_INTERRUPT_PER_OVERFLOW_BATCHES = 10; //------------------------------------------------------------------------------------------------ @@ -85,6 +88,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC private transient VectorDeserializeRow bigTableVectorDeserializeRow; + private transient Thread ownThread; + private transient int interruptCheckCounter = CHECK_INTERRUPT_PER_OVERFLOW_BATCHES; + // Debug display. protected transient long batchCounter; @@ -102,6 +108,20 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC super(ctx, vContext, conf); } + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + setUpInterruptChecking(); + } + + private void setUpInterruptChecking() { + for (Operator<? extends OperatorDesc> child : childOperatorsArray) { + // We will only do interrupt checking in the lowest-level operator for multiple joins. + if (child instanceof VectorMapJoinGenerateResultOperator) return; + } + ownThread = Thread.currentThread(); + } + protected void commonSetup(VectorizedRowBatch batch) throws HiveException { super.commonSetup(batch); @@ -627,6 +647,15 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC protected void forwardOverflow() throws HiveException { forward(overflowBatch, null); overflowBatch.reset(); + maybeCheckInterrupt(); + } + + private void maybeCheckInterrupt() throws HiveException { + if (ownThread == null || --interruptCheckCounter > 0) return; + if (ownThread.isInterrupted()) { + throw new HiveException("Thread interrupted"); + } + interruptCheckCounter = CHECK_INTERRUPT_PER_OVERFLOW_BATCHES; } /**
