Repository: hive Updated Branches: refs/heads/master 6fb647f32 -> 16f68e3e6
HIVE-16778 : LLAP IO: better refcount management (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/16f68e3e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/16f68e3e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/16f68e3e Branch: refs/heads/master Commit: 16f68e3e67def86bb16789d146a9b68bc7863894 Parents: 6fb647f Author: sergey <[email protected]> Authored: Thu Jun 8 14:24:48 2017 -0700 Committer: sergey <[email protected]> Committed: Thu Jun 8 14:24:48 2017 -0700 ---------------------------------------------------------------------- .../llap/io/encoded/OrcEncodedDataReader.java | 45 +-- .../hive/ql/io/orc/encoded/EncodedReader.java | 3 +- .../ql/io/orc/encoded/EncodedReaderImpl.java | 339 +++++++++++-------- 3 files changed, 213 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/16f68e3e/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 7081140..abf8cd3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -155,10 +155,10 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> private Object fileKey; private FileSystem fs; /** - * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of rg-s that need to be - * read. Contains only stripes that are read, and only columns included. null => read all RGs. + * stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that need to be read. + * Contains only stripes that are read, and only columns included. null => read all RGs. */ - private boolean[][][] readState; + private boolean[][] stripeRgs; private volatile boolean isStopped = false; @SuppressWarnings("unused") private volatile boolean isPaused = false; @@ -268,12 +268,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> return null; } - if (readState.length == 0) { + if (stripeRgs.length == 0) { consumer.setDone(); recordReaderTime(startTime); return null; // No data to read. } - counters.setDesc(QueryFragmentCounters.Desc.STRIPES, stripeIxFrom + "," + readState.length); + counters.setDesc(QueryFragmentCounters.Desc.STRIPES, + stripeIxFrom + "," + stripeRgs.length); // 3. Apply SARG if needed, and otherwise determine what RGs to read. int stride = fileMetadata.getRowIndexStride(); @@ -334,13 +335,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // TODO: I/O threadpool could be here - one thread per stripe; for now, linear. boolean hasFileId = this.fileKey != null; OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, -1, 0) : null; - for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + for (int stripeIxMod = 0; stripeIxMod < stripeRgs.length; ++stripeIxMod) { if (processStop()) { recordReaderTime(startTime); return null; } int stripeIx = stripeIxFrom + stripeIxMod; - boolean[][] colRgs = null; + boolean[] rgs = null; OrcStripeMetadata stripeMetadata = null; StripeInformation stripe; try { @@ -348,16 +349,15 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> LlapIoImpl.ORC_LOGGER.trace("Reading stripe {}: {}, {}", stripeIx, stripe.getOffset(), stripe.getLength()); - colRgs = readState[stripeIxMod]; + rgs = stripeRgs[stripeIxMod]; if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { - LlapIoImpl.ORC_LOGGER.trace("readState[{}]: {}", stripeIxMod, Arrays.toString(colRgs)); + LlapIoImpl.ORC_LOGGER.trace("readState[{}]: {}", stripeIxMod, Arrays.toString(rgs)); } // We assume that NO_RGS value is only set from SARG filter and for all columns; // intermediate changes for individual columns will unset values in the array. // Skip this case for 0-column read. We could probably special-case it just like we do // in EncodedReaderImpl, but for now it's not that important. - if (colRgs.length > 0 && colRgs[0] == - RecordReaderImpl.SargApplier.READ_NO_RGS) continue; + if (rgs == RecordReaderImpl.SargApplier.READ_NO_RGS) continue; // 6.2. Ensure we have stripe metadata. We might have read it before for RG filtering. boolean isFoundInCache = false; @@ -421,7 +421,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // data it receives for one stripe. We could probably interrupt it, if it checked that. stripeReader.readEncodedColumns(stripeIx, stripe, stripeMetadata.getRowIndexes(), stripeMetadata.getEncodings(), stripeMetadata.getStreams(), globalIncludes, - colRgs, consumer); + rgs, consumer); } catch (Throwable t) { consumer.setError(t); cleanupReaders(); @@ -617,10 +617,10 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> */ private ArrayList<OrcStripeMetadata> readStripesMetadata( boolean[] globalInc, boolean[] sargColumns) throws IOException { - ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(readState.length); + ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(stripeRgs.length); boolean hasFileId = this.fileKey != null; OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, 0, 0) : null; - for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + for (int stripeIxMod = 0; stripeIxMod < stripeRgs.length; ++stripeIxMod) { OrcStripeMetadata value = null; int stripeIx = stripeIxMod + stripeIxFrom; if (hasFileId && metadataCache != null) { @@ -712,8 +712,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> OrcFile.WriterVersion.from(fileMetadata.getWriterVersionNum())); } boolean hasAnyData = false; - // readState should have been initialized by this time with an empty array. - for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + // stripeRgs should have been initialized by this time with an empty array. + for (int stripeIxMod = 0; stripeIxMod < stripeRgs.length; ++stripeIxMod) { int stripeIx = stripeIxMod + stripeIxFrom; StripeInformation stripe = fileMetadata.getStripes().get(stripeIx); int rgCount = getRgCount(stripe, rowIndexStride); @@ -739,17 +739,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } } assert isAll || isNone || rgsToRead.length == rgCount; - int fileIncludesCount = 0; - // TODO: hacky for now - skip the root 0-s column. - // We don't need separate readState w/o HL cache, should get rid of that instead. - for (int includeIx = 1; includeIx < globalIncludes.length; ++includeIx) { - fileIncludesCount += (globalIncludes[includeIx] ? 1 : 0); - } - readState[stripeIxMod] = new boolean[fileIncludesCount][]; - for (int includeIx = 0; includeIx < fileIncludesCount; ++includeIx) { - readState[stripeIxMod][includeIx] = (isAll || isNone) ? rgsToRead : + stripeRgs[stripeIxMod] = (isAll || isNone) ? rgsToRead : Arrays.copyOf(rgsToRead, rgsToRead.length); - } adjustRgMetric(rgCount, rgsToRead, isNone, isAll); } return hasAnyData; @@ -820,7 +811,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> LlapIoImpl.ORC_LOGGER.trace("Including stripes until {} (end of file); {} stripes", stripeIx, (stripeIxTo - stripeIxFrom)); } - readState = new boolean[stripeIxTo - stripeIxFrom][][]; + stripeRgs = new boolean[stripeIxTo - stripeIxFrom][]; } private class DataWrapperForOrc implements DataReader, DataCache { http://git-wip-us.apache.org/repos/asf/hive/blob/16f68e3e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java index ea9904a..7540e72 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java @@ -40,8 +40,7 @@ public interface EncodedReader { */ void readEncodedColumns(int stripeIx, StripeInformation stripe, OrcProto.RowIndex[] index, List<OrcProto.ColumnEncoding> encodings, - List<OrcProto.Stream> streams, - boolean[] included, boolean[][] colRgs, + List<OrcProto.Stream> streams, boolean[] included, boolean[] rgs, Consumer<OrcEncodedColumnBatch> consumer) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/16f68e3e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 6cd85d2..248feae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.IdentityHashMap; import java.util.List; @@ -223,7 +222,7 @@ class EncodedReaderImpl implements EncodedReader { @Override public void readEncodedColumns(int stripeIx, StripeInformation stripe, OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings, - List<OrcProto.Stream> streamList, boolean[] included, boolean[][] colRgs, + List<OrcProto.Stream> streamList, boolean[] included, boolean[] rgs, Consumer<OrcEncodedColumnBatch> consumer) throws IOException { // Note: for now we don't have to setError here, caller will setError if we throw. // We are also not supposed to call setDone, since we are only part of the operation. @@ -252,7 +251,6 @@ class EncodedReaderImpl implements EncodedReader { boolean isCompressed = (codec != null); CreateHelper listToRead = new CreateHelper(); boolean hasIndexOnlyCols = false; - boolean[] includedRgs = null; // Will always be the same for all cols at the moment. for (OrcProto.Stream stream : streamList) { long length = stream.getLength(); int colIx = stream.getColumn(); @@ -270,7 +268,6 @@ class EncodedReaderImpl implements EncodedReader { } ColumnReadContext ctx = colCtxs[colIx]; assert ctx != null; - includedRgs = colRgs[ctx.includedIx]; int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(), types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]); ctx.addStream(offset, stream, indexIx); @@ -278,13 +275,13 @@ class EncodedReaderImpl implements EncodedReader { LOG.trace("Adding stream for column " + colIx + ": " + streamKind + " at " + offset + ", " + length + ", index position " + indexIx); } - if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) { + if (rgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) { RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead, true); if (isTracingEnabled) { LOG.trace("Will read whole stream " + streamKind + "; added to " + listToRead.getTail()); } } else { - RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRgs, + RecordReaderUtils.addRgFilteredStreamToRanges(stream, rgs, codec != null, indexes[colIx], encodings.get(colIx), types.get(colIx), bufferSize, hasNull[colIx], offset, length, listToRead, true); } @@ -295,7 +292,7 @@ class EncodedReaderImpl implements EncodedReader { if (listToRead.get() == null) { // No data to read for this stripe. Check if we have some included index-only columns. // TODO: there may be a bug here. Could there be partial RG filtering on index-only column? - if (hasIndexOnlyCols && (includedRgs == null)) { + if (hasIndexOnlyCols && (rgs == null)) { OrcEncodedColumnBatch ecb = POOLS.ecbPool.take(); ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, included.length); consumer.consumeData(ecb); @@ -329,18 +326,29 @@ class EncodedReaderImpl implements EncodedReader { // can be freed in advance, we remove it from the map. IdentityHashMap<ByteBuffer, Boolean> toRelease = null; if (!isAllInCache.value) { - if (!isDataReaderOpen) { - this.dataReader.open(); - isDataReaderOpen = true; - } - dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc()); - toRelease = new IdentityHashMap<>(); - DiskRangeList drl = toRead.next; - while (drl != null) { - if (drl instanceof BufferChunk) { - toRelease.put(drl.getData(), true); + boolean hasError = true; + try { + if (!isDataReaderOpen) { + this.dataReader.open(); + isDataReaderOpen = true; + } + dataReader.readFileData(toRead.next, stripeOffset, + cacheWrapper.getAllocator().isDirectAlloc()); + toRelease = new IdentityHashMap<>(); + DiskRangeList drl = toRead.next; + while (drl != null) { + if (drl instanceof BufferChunk) { + toRelease.put(drl.getData(), true); + } + drl = drl.next; + } + hasError = false; + } finally { + // The FS can be closed from under us if the task is interrupted. Release cache buffers. + // We are assuming here that toRelease will not be present in such cases. + if (hasError) { + releaseInitialRefcounts(toRead.next); } - drl = drl.next; } } @@ -350,157 +358,198 @@ class EncodedReaderImpl implements EncodedReader { // either cache buffers, or buffers allocated by us and not cached (if we are only reading // parts of the data for some ranges and don't want to cache it). Both are represented by // CacheChunks, so the list is just CacheChunk-s from that point on. - DiskRangeList iter = toRead.next; // Keep "toRead" list for future use, don't extract(). + DiskRangeList iter = toRead.next; if (codec == null) { - for (int colIx = 0; colIx < colCtxs.length; ++colIx) { - ColumnReadContext ctx = colCtxs[colIx]; - if (ctx == null) continue; // This column is not included. - for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { - StreamContext sctx = ctx.streams[streamIx]; - DiskRangeList newIter = preReadUncompressedStream( - stripeOffset, iter, sctx.offset, sctx.offset + sctx.length); - if (newIter != null) { - iter = newIter; + boolean hasError = true; + try { + for (int colIx = 0; colIx < colCtxs.length; ++colIx) { + ColumnReadContext ctx = colCtxs[colIx]; + if (ctx == null) continue; // This column is not included. + for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { + StreamContext sctx = ctx.streams[streamIx]; + DiskRangeList newIter = preReadUncompressedStream( + stripeOffset, iter, sctx.offset, sctx.offset + sctx.length); + if (newIter != null) { + iter = newIter; + } + } + } + // Release buffers as we are done with all the streams... also see toRelease comment. + // With uncompressed streams, we know we are done earlier. + if (toRelease != null) { + releaseBuffers(toRelease.keySet(), true); + toRelease = null; + } + if (LOG.isInfoEnabled()) { + LOG.info("Disk ranges after pre-read (file " + fileKey + ", base offset " + + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); + } + iter = toRead.next; // Reset the iter to start. + hasError = false; + } finally { + // At this point, everything in the list is going to have a refcount of one. Unless it + // failed between the allocation and the incref for a single item, we should be ok. + if (hasError) { + releaseInitialRefcounts(toRead.next); + if (toRelease != null) { + releaseBuffers(toRelease.keySet(), true); + toRelease = null; } } } - // Release buffers as we are done with all the streams... also see toRelease comment.\ - // With uncompressed streams, we know we are done earlier. - if (toRelease != null) { - releaseBuffers(toRelease.keySet(), true); - toRelease = null; + } + + try { + // 4. Finally, decompress data, map per RG, and return to caller. + // We go by RG and not by column because that is how data is processed. + int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride); + for (int rgIx = 0; rgIx < rgCount; ++rgIx) { + if (rgs != null && !rgs[rgIx]) { + continue; // RG filtered. + } + boolean isLastRg = rgIx == rgCount - 1; + // Create the batch we will use to return data for this RG. + OrcEncodedColumnBatch ecb = POOLS.ecbPool.take(); + boolean hasError = true; + try { + ecb.init(fileKey, stripeIx, rgIx, included.length); + for (int colIx = 0; colIx < colCtxs.length; ++colIx) { + ColumnReadContext ctx = colCtxs[colIx]; + if (ctx == null) continue; // This column is not included. + if (isTracingEnabled) { + LOG.trace("ctx: {} rgIx: {} isLastRg: {} rgCount: {}", ctx, rgIx, isLastRg, rgCount); + } + OrcProto.RowIndexEntry index = ctx.rowIndex.getEntry(rgIx), + nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1); + ecb.initOrcColumn(ctx.colIx); + for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { + StreamContext sctx = ctx.streams[streamIx]; + ColumnStreamData cb = null; + try { + if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) { + // This stream is for entire stripe and needed for every RG; uncompress once and reuse. + if (isTracingEnabled) { + LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for" + + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length); + } + if (sctx.stripeLevelStream == null) { + sctx.stripeLevelStream = POOLS.csdPool.take(); + // We will be using this for each RG while also sending RGs to processing. + // To avoid buffers being unlocked, run refcount one ahead; so each RG + // processing will decref once, and the last one will unlock the buffers. + sctx.stripeLevelStream.incRef(); + // For stripe-level streams we don't need the extra refcount on the block. + // See class comment about refcounts. + long unlockUntilCOffset = sctx.offset + sctx.length; + DiskRangeList lastCached = readEncodedStream(stripeOffset, iter, + sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream, + unlockUntilCOffset, sctx.offset, toRelease); + if (lastCached != null) { + iter = lastCached; + } + } + sctx.stripeLevelStream.incRef(); + cb = sctx.stripeLevelStream; + } else { + // This stream can be separated by RG using index. Let's do that. + // Offset to where this RG begins. + long cOffset = sctx.offset + index.getPositions(sctx.streamIndexOffset); + // Offset relative to the beginning of the stream of where this RG ends. + long nextCOffsetRel = isLastRg ? sctx.length + : nextIndex.getPositions(sctx.streamIndexOffset); + // Offset before which this RG is guaranteed to end. Can only be estimated. + // We estimate the same way for compressed and uncompressed for now. + long endCOffset = sctx.offset + RecordReaderUtils.estimateRgEndOffset( + isCompressed, isLastRg, nextCOffsetRel, sctx.length, bufferSize); + // As we read, we can unlock initial refcounts for the buffers that end before + // the data that we need for this RG. + long unlockUntilCOffset = sctx.offset + nextCOffsetRel; + cb = createRgColumnStreamData( + rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed); + boolean isStartOfStream = sctx.bufferIter == null; + DiskRangeList lastCached = readEncodedStream(stripeOffset, + (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb, + unlockUntilCOffset, sctx.offset, toRelease); + if (lastCached != null) { + sctx.bufferIter = iter = lastCached; + } + } + ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb); + } catch (Exception ex) { + DiskRangeList drl = toRead == null ? null : toRead.next; + LOG.error("Error getting stream [" + sctx.kind + ", " + ctx.encoding + "] for" + + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + + sctx.length + "; toRead " + RecordReaderUtils.stringifyDiskRanges(drl), ex); + throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); + } + } + } + hasError = false; + } finally { + if (hasError) { + releaseEcbRefCountsOnError(ecb); + } + } + // After this, the non-initial refcounts are the responsibility of the consumer. + consumer.consumeData(ecb); } + if (isTracingEnabled) { - LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base offset " - + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); + LOG.trace("Disk ranges after preparing all the data " + + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } - iter = toRead.next; // Reset the iter to start. - } - - // 4. Finally, decompress data, map per RG, and return to caller. - // We go by RG and not by column because that is how data is processed. - int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride); - for (int rgIx = 0; rgIx < rgCount; ++rgIx) { - boolean isLastRg = rgIx == rgCount - 1; - // Create the batch we will use to return data for this RG. - OrcEncodedColumnBatch ecb = POOLS.ecbPool.take(); - ecb.init(fileKey, stripeIx, rgIx, included.length); - boolean isRGSelected = true; + } finally { + // Release the unreleased stripe-level buffers. See class comment about refcounts. for (int colIx = 0; colIx < colCtxs.length; ++colIx) { ColumnReadContext ctx = colCtxs[colIx]; if (ctx == null) continue; // This column is not included. - if (isTracingEnabled) { - LOG.trace("ctx: {} rgIx: {} isLastRg: {} rgCount: {}", ctx, rgIx, isLastRg, rgCount); - } - // TODO: simplify this now that high-level cache has been removed. Same RGs for all cols. - if (colRgs[ctx.includedIx] != null && !colRgs[ctx.includedIx][rgIx]) { - // RG x col filtered. - isRGSelected = false; - if (isTracingEnabled) { - LOG.trace("colIxMod: {} rgIx: {} colRgs[{}]: {} colRgs[{}][{}]: {}", ctx.includedIx, rgIx, ctx.includedIx, - Arrays.toString(colRgs[ctx.includedIx]), ctx.includedIx, rgIx, colRgs[ctx.includedIx][rgIx]); - } - continue; - } - OrcProto.RowIndexEntry index = ctx.rowIndex.getEntry(rgIx), - nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1); - ecb.initOrcColumn(ctx.colIx); for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { StreamContext sctx = ctx.streams[streamIx]; - ColumnStreamData cb = null; - try { - if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) { - // This stream is for entire stripe and needed for every RG; uncompress once and reuse. - if (isTracingEnabled) { - LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for" - + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length); - } - if (sctx.stripeLevelStream == null) { - sctx.stripeLevelStream = POOLS.csdPool.take(); - // We will be using this for each RG while also sending RGs to processing. - // To avoid buffers being unlocked, run refcount one ahead; so each RG - // processing will decref once, and the - // last one will unlock the buffers. - sctx.stripeLevelStream.incRef(); - // For stripe-level streams we don't need the extra refcount on the block. - // See class comment about refcounts. - long unlockUntilCOffset = sctx.offset + sctx.length; - DiskRangeList lastCached = readEncodedStream(stripeOffset, iter, - sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream, - unlockUntilCOffset, sctx.offset, toRelease); - if (lastCached != null) { - iter = lastCached; - } - } - sctx.stripeLevelStream.incRef(); - cb = sctx.stripeLevelStream; - } else { - // This stream can be separated by RG using index. Let's do that. - // Offset to where this RG begins. - long cOffset = sctx.offset + index.getPositions(sctx.streamIndexOffset); - // Offset relative to the beginning of the stream of where this RG ends. - long nextCOffsetRel = isLastRg ? sctx.length - : nextIndex.getPositions(sctx.streamIndexOffset); - // Offset before which this RG is guaranteed to end. Can only be estimated. - // We estimate the same way for compressed and uncompressed for now. - long endCOffset = sctx.offset + RecordReaderUtils.estimateRgEndOffset( - isCompressed, isLastRg, nextCOffsetRel, sctx.length, bufferSize); - // As we read, we can unlock initial refcounts for the buffers that end before - // the data that we need for this RG. - long unlockUntilCOffset = sctx.offset + nextCOffsetRel; - cb = createRgColumnStreamData( - rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed); - boolean isStartOfStream = sctx.bufferIter == null; - DiskRangeList lastCached = readEncodedStream(stripeOffset, - (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb, - unlockUntilCOffset, sctx.offset, toRelease); - if (lastCached != null) { - sctx.bufferIter = iter = lastCached; - } + if (sctx == null || sctx.stripeLevelStream == null) continue; + if (0 != sctx.stripeLevelStream.decRef()) continue; + // Note - this is a little bit confusing; the special treatment of stripe-level buffers + // is because we run the ColumnStreamData refcount one ahead (as specified above). It + // may look like this would release the buffers too many times (one release from the + // consumer, one from releaseInitialRefcounts below, and one here); however, this is + // merely handling a special case where all the batches that are sharing the stripe- + // level stream have been processed before we got here; they have all decRef-ed the CSD, + // but have not released the buffers because of that extra refCount. So, this is + // essentially the "consumer" refcount being released here. + for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Unlocking {} at the end of processing", buf); } - ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb); - } catch (Exception ex) { - DiskRangeList drl = toRead == null ? null : toRead.next; - LOG.error("Error getting stream [" + sctx.kind + ", " + ctx.encoding + "] for" - + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " - + sctx.length + "; toRead " + RecordReaderUtils.stringifyDiskRanges(drl), ex); - throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); + cacheWrapper.releaseBuffer(buf); } } } - if (isRGSelected) { - consumer.consumeData(ecb); + + releaseInitialRefcounts(toRead.next); + // Release buffers as we are done with all the streams... also see toRelease comment. + if (toRelease != null) { + releaseBuffers(toRelease.keySet(), true); } } + releaseCacheChunksIntoObjectPool(toRead.next); + } - if (isTracingEnabled) { - LOG.trace("Disk ranges after preparing all the data " - + RecordReaderUtils.stringifyDiskRanges(toRead.next)); - } - // Release the unreleased buffers. See class comment about refcounts. - for (int colIx = 0; colIx < colCtxs.length; ++colIx) { - ColumnReadContext ctx = colCtxs[colIx]; - if (ctx == null) continue; // This column is not included. - for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { - StreamContext sctx = ctx.streams[streamIx]; - if (sctx == null || sctx.stripeLevelStream == null) continue; - if (0 != sctx.stripeLevelStream.decRef()) continue; - for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Unlocking {} at the end of processing", buf); - } + private void releaseEcbRefCountsOnError(OrcEncodedColumnBatch ecb) { + if (isTracingEnabled) { + LOG.trace("Unlocking the batch not sent to consumer, on error"); + } + // We cannot send the ecb to consumer. Discard whatever is already there. + for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) { + if (!ecb.hasData(colIx)) continue; + ColumnStreamData[] datas = ecb.getColumnData(colIx); + for (ColumnStreamData data : datas) { + if (data == null || data.decRef() != 0) continue; + for (MemoryBuffer buf : data.getCacheBuffers()) { + if (buf == null) continue; cacheWrapper.releaseBuffer(buf); } } } - - releaseInitialRefcounts(toRead.next); - // Release buffers as we are done with all the streams... also see toRelease comment. - if (toRelease != null) { - releaseBuffers(toRelease.keySet(), true); - } - releaseCacheChunksIntoObjectPool(toRead.next); }
