Repository: hive
Updated Branches:
  refs/heads/master 109ec31dc -> 40eb9a51a


HIVE-20247 : cleanup issues in LLAP IO after cache OOM (Sergey Shelukhin, 
reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/40eb9a51
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/40eb9a51
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/40eb9a51

Branch: refs/heads/master
Commit: 40eb9a51abcee533e67c980cc3b4a0f1d9c86252
Parents: 042b2ef
Author: sergey <[email protected]>
Authored: Mon Jul 30 12:46:20 2018 -0700
Committer: sergey <[email protected]>
Committed: Mon Jul 30 12:54:41 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/cache/BuddyAllocator.java  |  11 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 101 +++++++++++++------
 2 files changed, 79 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/40eb9a51/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 013f353..e3ce2e7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -320,17 +320,24 @@ public final class BuddyAllocator
           hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0);
           destAllocIx = allocateFromDiscardResult(
               dest, destAllocIx, freeListIx, allocationSize, ctx);
-
           if (destAllocIx == dest.length) return;
         }
 
         if (hasDiscardedAny) {
           discardFailed = 0;
         } else if (++discardFailed > MAX_DISCARD_ATTEMPTS) {
+          isFailed = true;
+          // Ensure all-or-nothing allocation.
+          for (int i = 0; i < destAllocIx; ++i) {
+            try {
+              deallocate(dest[i]);
+            } catch (Throwable t) {
+              LlapIoImpl.LOG.info("Failed to deallocate after a partially 
successful allocate: " + dest[i]);
+            }
+          }
           String msg = "Failed to allocate " + size + "; at " + destAllocIx + 
" out of "
               + dest.length + " (entire cache is fragmented and locked, or an 
internal issue)";
           logOomErrorMessage(msg);
-          isFailed = true;
           throw new AllocatorOutOfMemoryException(msg);
         }
         ++attempt;

http://git-wip-us.apache.org/repos/asf/hive/blob/40eb9a51/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 759594a..9126bb9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -431,7 +431,7 @@ class EncodedReaderImpl implements EncodedReader {
             trace.logStartCol(ctx.colIx);
             for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
               StreamContext sctx = ctx.streams[streamIx];
-              ColumnStreamData cb;
+              ColumnStreamData cb = null;
               try {
                 if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || 
index == null) {
                   // This stream is for entire stripe and needed for every RG; 
uncompress once and reuse.
@@ -443,7 +443,7 @@ class EncodedReaderImpl implements EncodedReader {
                     trace.logStartStripeStream(sctx.kind);
                     sctx.stripeLevelStream = POOLS.csdPool.take();
                     // We will be using this for each RG while also sending 
RGs to processing.
-                    // To avoid buffers being unlocked, run refcount one 
ahead; so each RG 
+                    // To avoid buffers being unlocked, run refcount one 
ahead; so each RG
                     // processing will decref once, and the last one will 
unlock the buffers.
                     sctx.stripeLevelStream.incRef();
                     // For stripe-level streams we don't need the extra 
refcount on the block.
@@ -482,13 +482,18 @@ class EncodedReaderImpl implements EncodedReader {
                     sctx.bufferIter = iter = lastCached;
                   }
                 }
-                ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb);
               } catch (Exception ex) {
                 DiskRangeList drl = toRead == null ? null : toRead.next;
                 LOG.error("Error getting stream [" + sctx.kind + ", " + 
ctx.encoding + "] for"
                     + " column " + ctx.colIx + " RG " + rgIx + " at " + 
sctx.offset + ", "
                     + sctx.length + "; toRead " + 
RecordReaderUtils.stringifyDiskRanges(drl), ex);
                 throw (ex instanceof IOException) ? (IOException)ex : new 
IOException(ex);
+              } finally {
+                // Always add stream data to ecb; releaseEcbRefCountsOnError 
relies on it.
+                // Otherwise, we won't release consumer refcounts for a 
partially read stream.
+                if (cb != null) {
+                  ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb);
+                }
               }
             }
           }
@@ -670,6 +675,7 @@ class EncodedReaderImpl implements EncodedReader {
       if (toFree instanceof ProcCacheChunk) {
         ProcCacheChunk pcc = (ProcCacheChunk)toFree;
         if (pcc.originalData != null) {
+          // TODO: can this still happen? we now clean these up explicitly to 
avoid other issues.
           // This can only happen in case of failure - we read some data, but 
didn't decompress
           // it. Deallocate the buffer directly, do not decref.
           if (pcc.getBuffer() != null) {
@@ -677,7 +683,6 @@ class EncodedReaderImpl implements EncodedReader {
           }
           continue;
         }
-        
       }
       if (!(toFree instanceof CacheChunk)) continue;
       CacheChunk cc = (CacheChunk)toFree;
@@ -890,35 +895,69 @@ class EncodedReaderImpl implements EncodedReader {
       targetBuffers[ix] = chunk.getBuffer();
       ++ix;
     }
-    cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize,
-        cacheWrapper.getDataBufferFactory());
+    boolean isAllocated = false;
+    try {
+      cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize,
+          cacheWrapper.getDataBufferFactory());
+      isAllocated = true;
+    } finally {
+      // toDecompress/targetBuffers contents are actually already added to 
some structures that
+      // will be cleaned up on error. Remove the unallocated buffers; keep the 
cached buffers in.
+      if (!isAllocated) {
+        // Inefficient - this only happens during cleanup on errors.
+        for (MemoryBuffer buf : targetBuffers) {
+          csd.getCacheBuffers().remove(buf);
+        }
+        for (ProcCacheChunk chunk : toDecompress) {
+          chunk.buffer = null;
+        }
+      }
+    }
 
     // 4. Now decompress (or copy) the data into cache buffers.
-    for (ProcCacheChunk chunk : toDecompress) {
-      ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
-      if (chunk.isOriginalDataCompressed) {
-        boolean isOk = false;
-        try {
-          decompressChunk(chunk.originalData, codec, dest);
-          isOk = true;
-        } finally {
-          if (!isOk) {
-            isCodecFailure = true;
+    int decompressedIx = 0;
+    try {
+      while (decompressedIx < toDecompress.size()) {
+        ProcCacheChunk chunk = toDecompress.get(decompressedIx);
+        ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
+        if (chunk.isOriginalDataCompressed) {
+          boolean isOk = false;
+          try {
+            decompressChunk(chunk.originalData, codec, dest);
+            isOk = true;
+          } finally {
+            if (!isOk) {
+              isCodecFailure = true;
+            }
           }
+        } else {
+          copyUncompressedChunk(chunk.originalData, dest);
         }
-      } else {
-        copyUncompressedChunk(chunk.originalData, dest);
-      }
 
-      if (isTracingEnabled) {
-        LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after 
decompression)");
+        if (isTracingEnabled) {
+          LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after 
decompression)");
+        }
+        // After we set originalData to null, we incref the buffer and the 
cleanup would decref it.
+        // Note that this assumes the failure during incref means incref 
didn't occur.
+        try {
+          cacheWrapper.reuseBuffer(chunk.getBuffer());
+        } finally {
+          chunk.originalData = null;
+        }
+        ++decompressedIx;
       }
-      // After we set originalData to null, we incref the buffer and the 
cleanup would decref it.
-      // Note that this assumes the failure during incref means incref didn't 
occur.
-      try {
-        cacheWrapper.reuseBuffer(chunk.getBuffer());
-      } finally {
-        chunk.originalData = null;
+    } finally {
+      // This will only execute on error. Deallocate the remaining allocated 
buffers explicitly.
+      // The ones that were already incref-ed will be cleaned up with the 
regular cache buffers.
+      while (decompressedIx < toDecompress.size()) {
+        ProcCacheChunk chunk = toDecompress.get(decompressedIx);
+        csd.getCacheBuffers().remove(chunk.getBuffer());
+        try {
+          cacheWrapper.getAllocator().deallocate(chunk.getBuffer());
+        } catch (Throwable t) {
+          LOG.error("Ignoring the cleanup error after another error", t);
+        }
+        chunk.setBuffer(null);
       }
     }
 
@@ -959,7 +998,7 @@ class EncodedReaderImpl implements EncodedReader {
       if (current instanceof CacheChunk) {
         // 2a. This is a decoded compression buffer, add as is.
         CacheChunk cc = (CacheChunk)current;
-        if (isTracingEnabled) {
+        if (isTracingEnabled) { // TODO# HERE unaccompanied lock
           LOG.trace("Locking " + cc.getBuffer() + " due to reuse");
         }
         cacheWrapper.reuseBuffer(cc.getBuffer());
@@ -1052,7 +1091,7 @@ class EncodedReaderImpl implements EncodedReader {
    * to handle just for this case.
    * We could avoid copy in non-zcr case and manage the buffer that was not 
allocated by our
    * allocator. Uncompressed case is not mainline though so let's not 
complicate it.
-   * @param kind 
+   * @param kind
    */
   private DiskRangeList preReadUncompressedStream(long baseOffset, 
DiskRangeList start,
       long streamOffset, long streamEnd, Kind kind) throws IOException {
@@ -1564,7 +1603,7 @@ class EncodedReaderImpl implements EncodedReader {
         ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, 
isUncompressed, cbStartOffset,
             cbEndOffset, remaining, (BufferChunk)next, toDecompress, 
cacheBuffers, true);
         if (compressed.remaining() <= 0 && toRelease.remove(compressed)) {
-          releaseBuffer(compressed, true); // We copied the entire buffer. 
+          releaseBuffer(compressed, true); // We copied the entire buffer.
         } // else there's more data to process; will be handled in next call.
         return cc;
       }
@@ -2019,7 +2058,7 @@ class EncodedReaderImpl implements EncodedReader {
       hasError = false;
     } finally {
       // At this point, everything in the list is going to have a refcount of 
one. Unless it
-      // failed between the allocation and the incref for a single item, we 
should be ok. 
+      // failed between the allocation and the incref for a single item, we 
should be ok.
       if (hasError) {
         try {
           releaseInitialRefcounts(toRead.next);

Reply via email to