Repository: hive
Updated Branches:
  refs/heads/branch-1 22a910b1f -> 000fb2c2f


HIVE-13840: Orc split generation is reading file footers twice (Prasanth 
Jayachandran reviewed by Owen O'Malley)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/000fb2c2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/000fb2c2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/000fb2c2

Branch: refs/heads/branch-1
Commit: 000fb2c2fad1859918a9774d8342c08e89c6e495
Parents: 22a910b
Author: Prasanth Jayachandran <[email protected]>
Authored: Tue Jun 14 19:37:07 2016 -0700
Committer: Prasanth Jayachandran <[email protected]>
Committed: Tue Jun 14 19:37:07 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   4 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   | 190 +++++++++++++++++--
 2 files changed, 178 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/000fb2c2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 9ac34b7..64abe91 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -397,7 +397,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       }
       try {
         OrcFile.createReader(file.getPath(),
-            OrcFile.readerOptions(conf).filesystem(fs));
+            
OrcFile.readerOptions(conf).filesystem(fs).maxLength(file.getLen()));
       } catch (IOException e) {
         return false;
       }
@@ -1002,7 +1002,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
 
     private void populateAndCacheStripeDetails() throws IOException {
       Reader orcReader = OrcFile.createReader(file.getPath(),
-          OrcFile.readerOptions(context.conf).filesystem(fs));
+          
OrcFile.readerOptions(context.conf).filesystem(fs).maxLength(file.getLen()));
       if (fileInfo != null) {
         stripes = fileInfo.stripeInfos;
         fileMetaInfo = fileInfo.fileMetaInfo;

http://git-wip-us.apache.org/repos/asf/hive/blob/000fb2c2/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java 
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index fa32bf6..3e7565e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -33,6 +33,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Properties;
@@ -47,7 +48,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -661,6 +664,14 @@ public class TestInputOutputFormat {
       this.hosts = hosts;
     }
 
+    public void setOffset(int offset) {
+      this.offset = offset;
+    }
+
+    public void setLength(int length) {
+      this.length = length;
+    }
+
     @Override
     public String toString() {
       StringBuilder buffer = new StringBuilder();
@@ -799,6 +810,9 @@ public class TestInputOutputFormat {
       DataOutputBuffer buf = (DataOutputBuffer) getWrappedStream();
       file.length = buf.getLength();
       file.content = new byte[file.length];
+      MockBlock block = new MockBlock("host1");
+      block.setLength(file.length);
+      setBlocks(block);
       System.arraycopy(buf.getData(), 0, file.content, 0, file.length);
     }
 
@@ -811,6 +825,7 @@ public class TestInputOutputFormat {
   public static class MockFileSystem extends FileSystem {
     final List<MockFile> files = new ArrayList<MockFile>();
     Path workingDir = new Path("/");
+    protected Statistics statistics;
 
     @SuppressWarnings("unused")
     public MockFileSystem() {
@@ -820,11 +835,13 @@ public class TestInputOutputFormat {
     @Override
     public void initialize(URI uri, Configuration conf) {
       setConf(conf);
+      statistics = getStatistics("mock", getClass());
     }
 
     public MockFileSystem(Configuration conf, MockFile... files) {
       setConf(conf);
       this.files.addAll(Arrays.asList(files));
+      statistics = getStatistics("mock", getClass());
     }
 
     void clear() {
@@ -842,6 +859,7 @@ public class TestInputOutputFormat {
 
     @Override
     public FSDataInputStream open(Path path, int i) throws IOException {
+      statistics.incrementReadOps(1);
       for(MockFile file: files) {
         if (file.path.equals(path)) {
           return new FSDataInputStream(new MockInputStream(file));
@@ -856,6 +874,7 @@ public class TestInputOutputFormat {
                                      short replication, long blockSize,
                                      Progressable progressable
                                      ) throws IOException {
+      statistics.incrementWriteOps(1);
       MockFile file = null;
       for(MockFile currentFile: files) {
         if (currentFile.path.equals(path)) {
@@ -874,27 +893,70 @@ public class TestInputOutputFormat {
     public FSDataOutputStream append(Path path, int bufferSize,
                                      Progressable progressable
                                      ) throws IOException {
+      statistics.incrementWriteOps(1);
       return create(path, FsPermission.getDefault(), true, bufferSize,
           (short) 3, 256 * 1024, progressable);
     }
 
     @Override
     public boolean rename(Path path, Path path2) throws IOException {
+      statistics.incrementWriteOps(1);
       return false;
     }
 
     @Override
     public boolean delete(Path path) throws IOException {
+      statistics.incrementWriteOps(1);
       return false;
     }
 
     @Override
     public boolean delete(Path path, boolean b) throws IOException {
+      statistics.incrementWriteOps(1);
       return false;
     }
 
     @Override
+    public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
+            throws IOException {
+      return new RemoteIterator<LocatedFileStatus>() {
+        private Iterator<LocatedFileStatus> iterator = 
listLocatedFileStatuses(f).iterator();
+
+        @Override
+        public boolean hasNext() throws IOException {
+          return iterator.hasNext();
+        }
+
+        @Override
+        public LocatedFileStatus next() throws IOException {
+          return iterator.next();
+        }
+      };
+    }
+
+    private List<LocatedFileStatus> listLocatedFileStatuses(Path path) throws 
IOException {
+      statistics.incrementReadOps(1);
+      path = path.makeQualified(this);
+      List<LocatedFileStatus> result = new ArrayList<>();
+      String pathname = path.toString();
+      String pathnameAsDir = pathname + "/";
+      Set<String> dirs = new TreeSet<String>();
+      MockFile file = findFile(path);
+      if (file != null) {
+        result.add(createLocatedStatus(file));
+        return result;
+      }
+      findMatchingLocatedFiles(files, pathnameAsDir, dirs, result);
+      // for each directory add it once
+      for (String dir : dirs) {
+        result.add(createLocatedDirectory(new MockPath(this, pathnameAsDir + 
dir)));
+      }
+      return result;
+    }
+
+    @Override
     public FileStatus[] listStatus(Path path) throws IOException {
+      statistics.incrementReadOps(1);
       path = path.makeQualified(this);
       List<FileStatus> result = new ArrayList<FileStatus>();
       String pathname = path.toString();
@@ -921,6 +983,32 @@ public class TestInputOutputFormat {
       return result.toArray(new FileStatus[result.size()]);
     }
 
+    private MockFile findFile(Path path) {
+      for (MockFile file: files) {
+        if (file.path.equals(path)) {
+          return file;
+        }
+      }
+      return null;
+    }
+
+    private void findMatchingLocatedFiles(
+            List<MockFile> files, String pathnameAsDir, Set<String> dirs, 
List<LocatedFileStatus> result)
+            throws IOException {
+      for (MockFile file : files) {
+        String filename = file.path.toString();
+        if (filename.startsWith(pathnameAsDir)) {
+          String tail = filename.substring(pathnameAsDir.length());
+          int nextSlash = tail.indexOf('/');
+          if (nextSlash > 0) {
+            dirs.add(tail.substring(0, nextSlash));
+          } else {
+            result.add(createLocatedStatus(file));
+          }
+        }
+      }
+    }
+
     @Override
     public void setWorkingDirectory(Path path) {
       workingDir = path;
@@ -933,9 +1021,22 @@ public class TestInputOutputFormat {
 
     @Override
     public boolean mkdirs(Path path, FsPermission fsPermission) {
+      statistics.incrementWriteOps(1);
       return false;
     }
 
+    private LocatedFileStatus createLocatedStatus(MockFile file) throws 
IOException {
+      FileStatus fileStatus = createStatus(file);
+      return new LocatedFileStatus(fileStatus,
+              getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), 
false));
+    }
+
+    private LocatedFileStatus createLocatedDirectory(Path dir) throws 
IOException {
+      FileStatus fileStatus = createDirectory(dir);
+      return new LocatedFileStatus(fileStatus,
+              getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), 
false));
+    }
+
     private FileStatus createStatus(MockFile file) {
       return new FileStatus(file.length, false, 1, file.blockSize, 0, 0,
           FsPermission.createImmutable((short) 644), "owen", "group",
@@ -949,6 +1050,7 @@ public class TestInputOutputFormat {
 
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
+      statistics.incrementReadOps(1);
       path = path.makeQualified(this);
       String pathnameAsDir = path.toString() + "/";
       for(MockFile file: files) {
@@ -962,24 +1064,30 @@ public class TestInputOutputFormat {
     }
 
     @Override
-    public BlockLocation[] getFileBlockLocations(FileStatus stat,
-                                                 long start, long len) {
+    public BlockLocation[] getFileBlockLocations(FileStatus stat, long start, 
long len) throws IOException {
+      return getFileBlockLocationsImpl(stat, start, len, true);
+    }
+
+    private BlockLocation[] getFileBlockLocationsImpl(FileStatus stat, long 
start, long len, boolean updateStats)
+            throws IOException {
+      if (updateStats) {
+        statistics.incrementReadOps(1);
+      }
       List<BlockLocation> result = new ArrayList<BlockLocation>();
-      for(MockFile file: files) {
-        if (file.path.equals(stat.getPath())) {
-          for(MockBlock block: file.blocks) {
-            if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
-                block.length, start, len) > 0) {
-              String[] topology = new String[block.hosts.length];
-              for(int i=0; i < topology.length; ++i) {
-                topology[i] = "/rack/ " + block.hosts[i];
-              }
-              result.add(new BlockLocation(block.hosts, block.hosts,
-                  topology, block.offset, block.length));
+      MockFile file = findFile(stat.getPath());
+      if (file != null) {
+        for(MockBlock block: file.blocks) {
+          if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
+                  block.length, start, len) > 0) {
+            String[] topology = new String[block.hosts.length];
+            for(int i=0; i < topology.length; ++i) {
+              topology[i] = "/rack/ " + block.hosts[i];
             }
+            result.add(new BlockLocation(block.hosts, block.hosts,
+                    topology, block.offset, block.length));
           }
-          return result.toArray(new BlockLocation[result.size()]);
         }
+        return result.toArray(new BlockLocation[result.size()]);
       }
       return new BlockLocation[0];
     }
@@ -2124,4 +2232,58 @@ public class TestInputOutputFormat {
     assertEquals(0, splits.length);
   }
 
+  @Test
+  public void testSplitGenReadOps() throws Exception {
+    MockFileSystem fs = new MockFileSystem(conf);
+    conf.set("mapred.input.dir", "mock:///mocktable");
+    conf.set("fs.defaultFS", "mock:///");
+    conf.set("fs.mock.impl", MockFileSystem.class.getName());
+    MockPath mockPath = new MockPath(fs, "mock:///mocktable");
+    StructObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = (StructObjectInspector)
+              ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class,
+                      ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+    Writer writer =
+            OrcFile.createWriter(new Path(mockPath + "/0_0"),
+                    OrcFile.writerOptions(conf).blockPadding(false)
+                            .bufferSize(1024).inspector(inspector));
+    for (int i = 0; i < 10; ++i) {
+      writer.addRow(new MyRow(i, 2 * i));
+    }
+    writer.close();
+
+    writer = OrcFile.createWriter(new Path(mockPath + "/0_1"),
+            OrcFile.writerOptions(conf).blockPadding(false)
+                    .bufferSize(1024).inspector(inspector));
+    for (int i = 0; i < 10; ++i) {
+      writer.addRow(new MyRow(i, 2 * i));
+    }
+    writer.close();
+
+    int readOpsBefore = -1;
+    for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
+      if (statistics.getScheme().equalsIgnoreCase("mock")) {
+        readOpsBefore = statistics.getReadOps();
+      }
+    }
+    assertTrue("MockFS has stats. Read ops not expected to be -1", 
readOpsBefore != -1);
+    OrcInputFormat orcInputFormat = new OrcInputFormat();
+    InputSplit[] splits = orcInputFormat.getSplits(conf, 2);
+    int readOpsDelta = -1;
+    for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
+      if (statistics.getScheme().equalsIgnoreCase("mock")) {
+        readOpsDelta = statistics.getReadOps() - readOpsBefore;
+      }
+    }
+    // call-1: listLocatedStatus - mock:/mocktable
+    // call-2: open - mock:/mocktable/0_0
+    // call-3: open - mock:/mocktable/0_1
+    assertEquals(3, readOpsDelta);
+
+    assertEquals(2, splits.length);
+    // revert back to local fs
+    conf.set("fs.defaultFS", "file:///");
+  }
 }

Reply via email to