Author: sershe
Date: Thu Feb  5 03:23:55 2015
New Revision: 1657461

URL: http://svn.apache.org/r1657461
Log:
HIVE-9418p5 : Yet more bugfixes

Modified:
    hive/branches/llap/itests/qtest/pom.xml
    
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java

Modified: hive/branches/llap/itests/qtest/pom.xml
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/itests/qtest/pom.xml?rev=1657461&r1=1657460&r2=1657461&view=diff
==============================================================================
--- hive/branches/llap/itests/qtest/pom.xml (original)
+++ hive/branches/llap/itests/qtest/pom.xml Thu Feb  5 03:23:55 2015
@@ -107,6 +107,12 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-llap-server</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
     <!-- test inter-project -->
     <dependency>
       <groupId>junit</groupId>

Modified: 
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java?rev=1657461&r1=1657460&r2=1657461&view=diff
==============================================================================
--- 
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
 (original)
+++ 
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
 Thu Feb  5 03:23:55 2015
@@ -62,6 +62,7 @@ public class LlapIoProxy {
       @SuppressWarnings("unchecked")
       Class<? extends LlapIo> clazz = (Class<? extends 
LlapIo>)Class.forName(IMPL_CLASS);
       Constructor<? extends LlapIo> ctor = 
clazz.getDeclaredConstructor(Configuration.class);
+      ctor.setAccessible(true);
       return ctor.newInstance(conf);
     } catch (Exception e) {
       throw new RuntimeException("Failed to create impl class", e);

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1657461&r1=1657460&r2=1657461&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
 Thu Feb  5 03:23:55 2015
@@ -167,9 +167,10 @@ public class LowLevelCacheImpl implement
     try {
       for (int i = 0; i < ranges.length; ++i) {
         LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i];
+        assert !buffer.isLocked(); // TODO: is this always true? does put 
happen before reuse?
+        buffer.incRef();
         long offset = ranges[i].offset;
         buffer.declaredLength = ranges[i].getLength();
-        assert buffer.isLocked();
         while (true) { // Overwhelmingly executes once, or maybe twice 
(replacing stale value).
           LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, 
buffer);
           if (oldVal == null) {

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1657461&r1=1657460&r2=1657461&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java 
(original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java 
Thu Feb  5 03:23:55 2015
@@ -105,8 +105,16 @@ abstract class InStream extends InputStr
     }
 
     public void seek(long desired) {
-      for(int i = 0; i < bytes.size(); ++i) {
-        DiskRange curRange = bytes.get(i);
+      if (desired == 0 && bytes.isEmpty()) {
+        logEmptySeek(name);
+        return;
+      }
+      int i = 0;
+      for(DiskRange curRange : bytes) {
+        if (desired == 0 && curRange.getData().remaining() == 0) {
+          logEmptySeek(name);
+          return;
+        }
         if (curRange.offset <= desired &&
             (desired - curRange.offset) < curRange.getLength()) {
           currentOffset = desired;
@@ -117,6 +125,7 @@ abstract class InStream extends InputStr
           this.range.position(pos);
           return;
         }
+        ++i;
       }
       throw new IllegalArgumentException("Seek in " + name + " to " +
         desired + " is outside of the data");
@@ -358,6 +367,10 @@ abstract class InStream extends InputStr
     }
 
     private void seek(long desired) throws IOException {
+      if (desired == 0 && bytes.isEmpty()) {
+        logEmptySeek(name);
+        return;
+      }
       int i = 0;
       for (DiskRange range : bytes) {
         if (range.offset <= desired && desired < range.end) {
@@ -433,6 +446,12 @@ abstract class InStream extends InputStr
 
   public abstract void seek(PositionProvider index) throws IOException;
 
+  private static void logEmptySeek(String name) {
+    if (LOG.isWarnEnabled()) {
+      LOG.warn("Attempting seek into empty stream (" + name + ") Skipping 
stream.");
+    }
+  }
+
   /**
    * Create an input stream from a list of buffers.
    * @param name the name of the stream

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1657461&r1=1657460&r2=1657461&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
 Thu Feb  5 03:23:55 2015
@@ -3465,7 +3465,7 @@ public class RecordReaderImpl implements
     final int colIx;
 
     public void addStream(long offset, OrcProto.Stream stream, int indexIx) {
-      streams[++streamCount] = new StreamContext(stream, offset, indexIx);
+      streams[streamCount++] = new StreamContext(stream, offset, indexIx);
     }
   }
 
@@ -3496,14 +3496,17 @@ public class RecordReaderImpl implements
         this.streamList != null ? this.streamList : 
stripeFooter.getStreamsList();
     List<ColumnEncoding> encodings =
         this.encodings != null ? this.encodings : 
stripeFooter.getColumnsList();
+
+    // 1. Figure out what we have to read.
     LinkedList<DiskRange> rangesToRead = new LinkedList<DiskRange>();
     long offset = 0;
-    // Figure out which columns have a present stream
+    // 1.1. Figure out which columns have a present stream
     boolean[] hasNull = findPresentStreamsByColumn(streamList, types);
     DiskRange lastRange = null;
+
     // We assume stream list is sorted by column and that non-data
     // streams do not interleave data streams for the same column.
-    // With that in mind, determine disk ranges to read/get from cache (not by 
stream).
+    // 1.2. With that in mind, determine disk ranges to read/get from cache 
(not by stream).
     int colRgIx = -1, lastColIx = -1;
     ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
     boolean[] includedRgs = null;
@@ -3516,18 +3519,21 @@ public class RecordReaderImpl implements
         offset += length;
         continue;
       }
-      ColumnReadContext ctx = colCtxs[colRgIx];
+      ColumnReadContext ctx = null;
       if (lastColIx != colIx) {
-        assert ctx == null;
         ++colRgIx;
+        assert colCtxs[colRgIx] == null;
         lastColIx = colIx;
         includedRgs = colRgs[colRgIx];
         ctx = colCtxs[colRgIx] = new ColumnReadContext(
             offset, colIx, encodings.get(colIx), indexes[colIx]);
+      } else {
+        ctx = colCtxs[colRgIx];
+        assert ctx != null;
       }
       int indexIx = getIndexPosition(ctx.encoding.getKind(),
           types.get(colIx).getKind(), streamKind, isCompressed, 
hasNull[colIx]);
-      colCtxs[colRgIx].addStream(offset, stream, indexIx);
+      ctx.addStream(offset, stream, indexIx);
       if (includedRgs == null || isDictionary(streamKind, 
encodings.get(colIx))) {
         lastRange = addEntireStreamToResult(offset, length, lastRange, 
rangesToRead);
       } else {
@@ -3538,7 +3544,7 @@ public class RecordReaderImpl implements
       offset += length;
     }
 
-    // Now, read all of these from cache or disk.
+    // 2. Now, read all of the ranges from cache or disk.
     if (LOG.isDebugEnabled()) {
       LOG.debug("chunks = " + stringifyDiskRanges(rangesToRead));
     }
@@ -3548,7 +3554,7 @@ public class RecordReaderImpl implements
     }
     readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead);
 
-    // Separate buffers for each stream from the data we have.
+    // 2.1. Separate buffers for each stream from the data we have.
     // TODO: given how we read, we could potentially get rid of this step?
     for (ColumnReadContext colCtx : colCtxs) {
       for (int i = 0; i < colCtx.streamCount; ++i) {
@@ -3557,7 +3563,7 @@ public class RecordReaderImpl implements
       }
     }
 
-    // Finally, decompress data, map per RG, and return to caller.
+    // 3. Finally, decompress data, map per RG, and return to caller.
     // We go by RG and not by column because that is how data is processed.
     int rgCount = (int)Math.ceil((double)rowCountInStripe / rowIndexStride);
     for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
@@ -3593,8 +3599,6 @@ public class RecordReaderImpl implements
         consumer.consumeData(ecb);
       }
     }
-
-    throw new UnsupportedOperationException("not implemented");
   }
 
   private StreamBuffer getStripeLevelStream(


Reply via email to