Author: sershe Date: Thu Feb 5 03:23:55 2015 New Revision: 1657461 URL: http://svn.apache.org/r1657461 Log: HIVE-9418p5 : Yet more bugfixes
Modified: hive/branches/llap/itests/qtest/pom.xml hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Modified: hive/branches/llap/itests/qtest/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/llap/itests/qtest/pom.xml?rev=1657461&r1=1657460&r2=1657461&view=diff ============================================================================== --- hive/branches/llap/itests/qtest/pom.xml (original) +++ hive/branches/llap/itests/qtest/pom.xml Thu Feb 5 03:23:55 2015 @@ -107,6 +107,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-llap-server</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <!-- test inter-project --> <dependency> <groupId>junit</groupId> Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java?rev=1657461&r1=1657460&r2=1657461&view=diff ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java (original) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java Thu Feb 5 03:23:55 2015 @@ -62,6 +62,7 @@ public class LlapIoProxy { @SuppressWarnings("unchecked") Class<? extends LlapIo> clazz = (Class<? extends LlapIo>)Class.forName(IMPL_CLASS); Constructor<? extends LlapIo> ctor = clazz.getDeclaredConstructor(Configuration.class); + ctor.setAccessible(true); return ctor.newInstance(conf); } catch (Exception e) { throw new RuntimeException("Failed to create impl class", e); Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1657461&r1=1657460&r2=1657461&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Thu Feb 5 03:23:55 2015 @@ -167,9 +167,10 @@ public class LowLevelCacheImpl implement try { for (int i = 0; i < ranges.length; ++i) { LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i]; + assert !buffer.isLocked(); // TODO: is this always true? does put happen before reuse? + buffer.incRef(); long offset = ranges[i].offset; buffer.declaredLength = ranges[i].getLength(); - assert buffer.isLocked(); while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer); if (oldVal == null) { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1657461&r1=1657460&r2=1657461&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Thu Feb 5 03:23:55 2015 @@ -105,8 +105,16 @@ abstract class InStream extends InputStr } public void seek(long desired) { - for(int i = 0; i < bytes.size(); ++i) { - DiskRange curRange = bytes.get(i); + if (desired == 0 && bytes.isEmpty()) { + logEmptySeek(name); + return; + } + int i = 0; + for(DiskRange curRange : bytes) { + if (desired == 0 && curRange.getData().remaining() == 0) { + logEmptySeek(name); + return; + } if (curRange.offset <= desired && (desired - curRange.offset) < curRange.getLength()) { currentOffset = desired; @@ -117,6 +125,7 @@ abstract class InStream extends InputStr this.range.position(pos); return; } + ++i; } throw new IllegalArgumentException("Seek in " + name + " to " + desired + " is outside of the data"); @@ -358,6 +367,10 @@ abstract class InStream extends InputStr } private void seek(long desired) throws IOException { + if (desired == 0 && bytes.isEmpty()) { + logEmptySeek(name); + return; + } int i = 0; for (DiskRange range : bytes) { if (range.offset <= desired && desired < range.end) { @@ -433,6 +446,12 @@ abstract class InStream extends InputStr public abstract void seek(PositionProvider index) throws IOException; + private static void logEmptySeek(String name) { + if (LOG.isWarnEnabled()) { + LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream."); + } + } + /** * Create an input stream from a list of buffers. * @param name the name of the stream Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1657461&r1=1657460&r2=1657461&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Thu Feb 5 03:23:55 2015 @@ -3465,7 +3465,7 @@ public class RecordReaderImpl implements final int colIx; public void addStream(long offset, OrcProto.Stream stream, int indexIx) { - streams[++streamCount] = new StreamContext(stream, offset, indexIx); + streams[streamCount++] = new StreamContext(stream, offset, indexIx); } } @@ -3496,14 +3496,17 @@ public class RecordReaderImpl implements this.streamList != null ? this.streamList : stripeFooter.getStreamsList(); List<ColumnEncoding> encodings = this.encodings != null ? this.encodings : stripeFooter.getColumnsList(); + + // 1. Figure out what we have to read. LinkedList<DiskRange> rangesToRead = new LinkedList<DiskRange>(); long offset = 0; - // Figure out which columns have a present stream + // 1.1. Figure out which columns have a present stream boolean[] hasNull = findPresentStreamsByColumn(streamList, types); DiskRange lastRange = null; + // We assume stream list is sorted by column and that non-data // streams do not interleave data streams for the same column. - // With that in mind, determine disk ranges to read/get from cache (not by stream). + // 1.2. With that in mind, determine disk ranges to read/get from cache (not by stream). int colRgIx = -1, lastColIx = -1; ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length]; boolean[] includedRgs = null; @@ -3516,18 +3519,21 @@ public class RecordReaderImpl implements offset += length; continue; } - ColumnReadContext ctx = colCtxs[colRgIx]; + ColumnReadContext ctx = null; if (lastColIx != colIx) { - assert ctx == null; ++colRgIx; + assert colCtxs[colRgIx] == null; lastColIx = colIx; includedRgs = colRgs[colRgIx]; ctx = colCtxs[colRgIx] = new ColumnReadContext( offset, colIx, encodings.get(colIx), indexes[colIx]); + } else { + ctx = colCtxs[colRgIx]; + assert ctx != null; } int indexIx = getIndexPosition(ctx.encoding.getKind(), types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]); - colCtxs[colRgIx].addStream(offset, stream, indexIx); + ctx.addStream(offset, stream, indexIx); if (includedRgs == null || isDictionary(streamKind, encodings.get(colIx))) { lastRange = addEntireStreamToResult(offset, length, lastRange, rangesToRead); } else { @@ -3538,7 +3544,7 @@ public class RecordReaderImpl implements offset += length; } - // Now, read all of these from cache or disk. + // 2. Now, read all of the ranges from cache or disk. if (LOG.isDebugEnabled()) { LOG.debug("chunks = " + stringifyDiskRanges(rangesToRead)); } @@ -3548,7 +3554,7 @@ public class RecordReaderImpl implements } readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead); - // Separate buffers for each stream from the data we have. + // 2.1. Separate buffers for each stream from the data we have. // TODO: given how we read, we could potentially get rid of this step? for (ColumnReadContext colCtx : colCtxs) { for (int i = 0; i < colCtx.streamCount; ++i) { @@ -3557,7 +3563,7 @@ public class RecordReaderImpl implements } } - // Finally, decompress data, map per RG, and return to caller. + // 3. Finally, decompress data, map per RG, and return to caller. // We go by RG and not by column because that is how data is processed. int rgCount = (int)Math.ceil((double)rowCountInStripe / rowIndexStride); for (int rgIx = 0; rgIx < rgCount; ++rgIx) { @@ -3593,8 +3599,6 @@ public class RecordReaderImpl implements consumer.consumeData(ecb); } } - - throw new UnsupportedOperationException("not implemented"); } private StreamBuffer getStripeLevelStream(