Repository: hive Updated Branches: refs/heads/master 109ec31dc -> 40eb9a51a
HIVE-20247 : cleanup issues in LLAP IO after cache OOM (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/40eb9a51 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/40eb9a51 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/40eb9a51 Branch: refs/heads/master Commit: 40eb9a51abcee533e67c980cc3b4a0f1d9c86252 Parents: 042b2ef Author: sergey <[email protected]> Authored: Mon Jul 30 12:46:20 2018 -0700 Committer: sergey <[email protected]> Committed: Mon Jul 30 12:54:41 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/cache/BuddyAllocator.java | 11 +- .../ql/io/orc/encoded/EncodedReaderImpl.java | 101 +++++++++++++------ 2 files changed, 79 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/40eb9a51/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 013f353..e3ce2e7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -320,17 +320,24 @@ public final class BuddyAllocator hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0); destAllocIx = allocateFromDiscardResult( dest, destAllocIx, freeListIx, allocationSize, ctx); - if (destAllocIx == dest.length) return; } if (hasDiscardedAny) { discardFailed = 0; } else if (++discardFailed > MAX_DISCARD_ATTEMPTS) { + isFailed = true; + // Ensure all-or-nothing allocation. + for (int i = 0; i < destAllocIx; ++i) { + try { + deallocate(dest[i]); + } catch (Throwable t) { + LlapIoImpl.LOG.info("Failed to deallocate after a partially successful allocate: " + dest[i]); + } + } String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of " + dest.length + " (entire cache is fragmented and locked, or an internal issue)"; logOomErrorMessage(msg); - isFailed = true; throw new AllocatorOutOfMemoryException(msg); } ++attempt; http://git-wip-us.apache.org/repos/asf/hive/blob/40eb9a51/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 759594a..9126bb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -431,7 +431,7 @@ class EncodedReaderImpl implements EncodedReader { trace.logStartCol(ctx.colIx); for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { StreamContext sctx = ctx.streams[streamIx]; - ColumnStreamData cb; + ColumnStreamData cb = null; try { if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) { // This stream is for entire stripe and needed for every RG; uncompress once and reuse. @@ -443,7 +443,7 @@ class EncodedReaderImpl implements EncodedReader { trace.logStartStripeStream(sctx.kind); sctx.stripeLevelStream = POOLS.csdPool.take(); // We will be using this for each RG while also sending RGs to processing. - // To avoid buffers being unlocked, run refcount one ahead; so each RG + // To avoid buffers being unlocked, run refcount one ahead; so each RG // processing will decref once, and the last one will unlock the buffers. sctx.stripeLevelStream.incRef(); // For stripe-level streams we don't need the extra refcount on the block. @@ -482,13 +482,18 @@ class EncodedReaderImpl implements EncodedReader { sctx.bufferIter = iter = lastCached; } } - ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb); } catch (Exception ex) { DiskRangeList drl = toRead == null ? null : toRead.next; LOG.error("Error getting stream [" + sctx.kind + ", " + ctx.encoding + "] for" + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length + "; toRead " + RecordReaderUtils.stringifyDiskRanges(drl), ex); throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); + } finally { + // Always add stream data to ecb; releaseEcbRefCountsOnError relies on it. + // Otherwise, we won't release consumer refcounts for a partially read stream. + if (cb != null) { + ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb); + } } } } @@ -670,6 +675,7 @@ class EncodedReaderImpl implements EncodedReader { if (toFree instanceof ProcCacheChunk) { ProcCacheChunk pcc = (ProcCacheChunk)toFree; if (pcc.originalData != null) { + // TODO: can this still happen? we now clean these up explicitly to avoid other issues. // This can only happen in case of failure - we read some data, but didn't decompress // it. Deallocate the buffer directly, do not decref. if (pcc.getBuffer() != null) { @@ -677,7 +683,6 @@ class EncodedReaderImpl implements EncodedReader { } continue; } - } if (!(toFree instanceof CacheChunk)) continue; CacheChunk cc = (CacheChunk)toFree; @@ -890,35 +895,69 @@ class EncodedReaderImpl implements EncodedReader { targetBuffers[ix] = chunk.getBuffer(); ++ix; } - cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize, - cacheWrapper.getDataBufferFactory()); + boolean isAllocated = false; + try { + cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize, + cacheWrapper.getDataBufferFactory()); + isAllocated = true; + } finally { + // toDecompress/targetBuffers contents are actually already added to some structures that + // will be cleaned up on error. Remove the unallocated buffers; keep the cached buffers in. + if (!isAllocated) { + // Inefficient - this only happens during cleanup on errors. + for (MemoryBuffer buf : targetBuffers) { + csd.getCacheBuffers().remove(buf); + } + for (ProcCacheChunk chunk : toDecompress) { + chunk.buffer = null; + } + } + } // 4. Now decompress (or copy) the data into cache buffers. - for (ProcCacheChunk chunk : toDecompress) { - ByteBuffer dest = chunk.getBuffer().getByteBufferRaw(); - if (chunk.isOriginalDataCompressed) { - boolean isOk = false; - try { - decompressChunk(chunk.originalData, codec, dest); - isOk = true; - } finally { - if (!isOk) { - isCodecFailure = true; + int decompressedIx = 0; + try { + while (decompressedIx < toDecompress.size()) { + ProcCacheChunk chunk = toDecompress.get(decompressedIx); + ByteBuffer dest = chunk.getBuffer().getByteBufferRaw(); + if (chunk.isOriginalDataCompressed) { + boolean isOk = false; + try { + decompressChunk(chunk.originalData, codec, dest); + isOk = true; + } finally { + if (!isOk) { + isCodecFailure = true; + } } + } else { + copyUncompressedChunk(chunk.originalData, dest); } - } else { - copyUncompressedChunk(chunk.originalData, dest); - } - if (isTracingEnabled) { - LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)"); + if (isTracingEnabled) { + LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)"); + } + // After we set originalData to null, we incref the buffer and the cleanup would decref it. + // Note that this assumes the failure during incref means incref didn't occur. + try { + cacheWrapper.reuseBuffer(chunk.getBuffer()); + } finally { + chunk.originalData = null; + } + ++decompressedIx; } - // After we set originalData to null, we incref the buffer and the cleanup would decref it. - // Note that this assumes the failure during incref means incref didn't occur. - try { - cacheWrapper.reuseBuffer(chunk.getBuffer()); - } finally { - chunk.originalData = null; + } finally { + // This will only execute on error. Deallocate the remaining allocated buffers explicitly. + // The ones that were already incref-ed will be cleaned up with the regular cache buffers. + while (decompressedIx < toDecompress.size()) { + ProcCacheChunk chunk = toDecompress.get(decompressedIx); + csd.getCacheBuffers().remove(chunk.getBuffer()); + try { + cacheWrapper.getAllocator().deallocate(chunk.getBuffer()); + } catch (Throwable t) { + LOG.error("Ignoring the cleanup error after another error", t); + } + chunk.setBuffer(null); } } @@ -959,7 +998,7 @@ class EncodedReaderImpl implements EncodedReader { if (current instanceof CacheChunk) { // 2a. This is a decoded compression buffer, add as is. CacheChunk cc = (CacheChunk)current; - if (isTracingEnabled) { + if (isTracingEnabled) { // TODO# HERE unaccompanied lock LOG.trace("Locking " + cc.getBuffer() + " due to reuse"); } cacheWrapper.reuseBuffer(cc.getBuffer()); @@ -1052,7 +1091,7 @@ class EncodedReaderImpl implements EncodedReader { * to handle just for this case. * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our * allocator. Uncompressed case is not mainline though so let's not complicate it. - * @param kind + * @param kind */ private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start, long streamOffset, long streamEnd, Kind kind) throws IOException { @@ -1564,7 +1603,7 @@ class EncodedReaderImpl implements EncodedReader { ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers, true); if (compressed.remaining() <= 0 && toRelease.remove(compressed)) { - releaseBuffer(compressed, true); // We copied the entire buffer. + releaseBuffer(compressed, true); // We copied the entire buffer. } // else there's more data to process; will be handled in next call. return cc; } @@ -2019,7 +2058,7 @@ class EncodedReaderImpl implements EncodedReader { hasError = false; } finally { // At this point, everything in the list is going to have a refcount of one. Unless it - // failed between the allocation and the incref for a single item, we should be ok. + // failed between the allocation and the incref for a single item, we should be ok. if (hasError) { try { releaseInitialRefcounts(toRead.next);
