Author: sershe
Date: Mon Feb 23 23:04:29 2015
New Revision: 1661818

URL: http://svn.apache.org/r1661818
Log:
Remove FS cache, fix potential read issue

Modified:
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1661818&r1=1661817&r2=1661818&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
 Mon Feb 23 23:04:29 2015
@@ -54,7 +54,6 @@ import org.apache.hadoop.mapred.FileSpli
 import org.apache.hadoop.mapred.InputSplit;
 
 public class OrcEncodedDataProducer implements 
EncodedDataProducer<OrcBatchKey> {
-  private FileSystem cachedFs = null;
   private Configuration conf;
   private final OrcMetadataCache metadataCache;
   // TODO: it makes zero sense to have both at the same time and duplicate 
data. Add "cache mode".
@@ -349,11 +348,12 @@ public class OrcEncodedDataProducer impl
      */
     private void ensureOrcReader() throws IOException {
       if (orcReader != null) return;
-      FileSystem fs = cachedFs;
       Path path = split.getPath();
-      if ("pfile".equals(path.toUri().getScheme())) {
+      // Disable filesystem caching for now; Tez closes it and FS cache will 
fix all that
+      FileSystem /*fs = cachedFs;
+      if ("pfile".equals(path.toUri().getScheme())) {*/
         fs = path.getFileSystem(conf); // Cannot use cached FS due to hive 
tests' proxy FS.
-      }
+      //}
       orcReader = OrcFile.createReader(path, 
OrcFile.readerOptions(conf).filesystem(fs));
     }
 
@@ -612,7 +612,6 @@ public class OrcEncodedDataProducer impl
   public OrcEncodedDataProducer(LowLevelCache lowLevelCache, 
Cache<OrcCacheKey> cache,
       Configuration conf) throws IOException {
     // We assume all splits will come from the same FS.
-    this.cachedFs = FileSystem.get(conf);
     this.cache = cache;
     this.lowLevelCache = lowLevelCache;
     this.conf = conf;

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java?rev=1661818&r1=1661817&r2=1661818&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
 Mon Feb 23 23:04:29 2015
@@ -259,11 +259,20 @@ public class RecordReaderUtils {
         }
       } else if (doForceDirect) {
         ByteBuffer directBuf = ByteBuffer.allocateDirect(len);
+        int pos = directBuf.position();
         try {
-          while (directBuf.remaining() > 0) {
+          while (directBuf.remaining() >= 0) {
             int count = file.read(directBuf);
             if (count < 0) throw new EOFException();
-            directBuf.position(directBuf.position() + count);
+            if (directBuf.position() != pos) {
+              RecordReaderImpl.LOG.info("Warning - position mismatch from " + 
file.getClass()
+                  + ": after reading " + count + ", expected " + pos + " but 
got " + directBuf.position());
+            }
+            pos += count;
+            if (pos > len) {
+              throw new AssertionError("Position " + pos + " length " + len + 
" after reading " + count);
+            }
+            directBuf.position(pos);
           }
         } catch (UnsupportedOperationException ex) {
           RecordReaderImpl.LOG.error("Stream does not support direct read; we 
will copy.");


Reply via email to