Repository: hive Updated Branches: refs/heads/branch-2.1 2068e3c0c -> 23ca1b46e
HIVE-13840: Orc split generation is reading file footers twice (Prasanth Jayachandran reviewed by Owen O'Malley) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/23ca1b46 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/23ca1b46 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/23ca1b46 Branch: refs/heads/branch-2.1 Commit: 23ca1b46e78759750b4726bedd97fef7c632e888 Parents: 2068e3c Author: Prasanth Jayachandran <[email protected]> Authored: Tue May 31 11:48:16 2016 -0700 Committer: Prasanth Jayachandran <[email protected]> Committed: Tue May 31 11:50:05 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/orc/impl/ReaderImpl.java | 1 + .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 4 +- .../hadoop/hive/ql/io/orc/ReaderImpl.java | 9 +- .../hive/ql/io/orc/TestInputOutputFormat.java | 159 +++++++++++++++++++ 4 files changed, 165 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/23ca1b46/orc/src/java/org/apache/orc/impl/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/ReaderImpl.java b/orc/src/java/org/apache/orc/impl/ReaderImpl.java index 2da590e..1dd5e43 100644 --- a/orc/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/orc/src/java/org/apache/orc/impl/ReaderImpl.java @@ -345,6 +345,7 @@ public class ReaderImpl implements Reader { options.getMaxLength()); this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer; } + options.fileMetaInfo(footerMetaData); MetaInfoObjExtractor rInfo = new MetaInfoObjExtractor(footerMetaData.compressionType, footerMetaData.bufferSize, http://git-wip-us.apache.org/repos/asf/hive/blob/23ca1b46/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 087207b..185852c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -467,7 +467,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } try { OrcFile.createReader(file.getPath(), - OrcFile.readerOptions(conf).filesystem(fs)); + OrcFile.readerOptions(conf).filesystem(fs).maxLength(file.getLen())); } catch (IOException e) { return false; } @@ -1391,7 +1391,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private Reader createOrcReader() throws IOException { return OrcFile.createReader(file.getPath(), - OrcFile.readerOptions(context.conf).filesystem(fs)); + OrcFile.readerOptions(context.conf).filesystem(fs).maxLength(file.getLen())); } private long computeProjectionSize(List<OrcProto.Type> types, http://git-wip-us.apache.org/repos/asf/hive/blob/23ca1b46/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 3a2e7d8..0b40fef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -56,10 +56,10 @@ public class ReaderImpl extends org.apache.orc.impl.ReaderImpl //serialized footer - Keeping this around for use by getFileMetaInfo() // will help avoid cpu cycles spend in deserializing at cost of increased // memory footprint. - private final ByteBuffer footerByteBuffer; + private ByteBuffer footerByteBuffer; // Same for metastore cache - maintains the same background buffer, but includes postscript. // This will only be set if the file footer/metadata was read from disk. - private final ByteBuffer footerMetaAndPsBuffer; + private ByteBuffer footerMetaAndPsBuffer; @Override public ObjectInspector getObjectInspector() { @@ -89,18 +89,15 @@ public class ReaderImpl extends org.apache.orc.impl.ReaderImpl FileMetadata fileMetadata = options.getFileMetadata(); if (fileMetadata != null) { this.inspector = OrcStruct.createObjectInspector(0, fileMetadata.getTypes()); - this.footerByteBuffer = null; // not cached and not needed here - this.footerMetaAndPsBuffer = null; } else { FileMetaInfo footerMetaData; if (options.getFileMetaInfo() != null) { footerMetaData = options.getFileMetaInfo(); - this.footerMetaAndPsBuffer = null; } else { footerMetaData = extractMetaInfoFromFooter(fileSystem, path, options.getMaxLength()); - this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer; } + this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer; MetaInfoObjExtractor rInfo = new MetaInfoObjExtractor(footerMetaData.compressionType, footerMetaData.bufferSize, http://git-wip-us.apache.org/repos/asf/hive/blob/23ca1b46/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 52098ae..edaecb3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -32,6 +32,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Properties; @@ -47,7 +48,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; @@ -600,6 +603,61 @@ public class TestInputOutputFormat { } @Test + public void testSplitGenReadOps() throws Exception { + MockFileSystem fs = new MockFileSystem(conf); + conf.set("mapred.input.dir", "mock:///mocktable"); + conf.set("fs.defaultFS", "mock:///"); + conf.set("fs.mock.impl", MockFileSystem.class.getName()); + MockPath mockPath = new MockPath(fs, "mock:///mocktable"); + StructObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = (StructObjectInspector) + ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + Writer writer = + OrcFile.createWriter(new Path(mockPath + "/0_0"), + OrcFile.writerOptions(conf).blockPadding(false) + .bufferSize(1024).inspector(inspector)); + for(int i=0; i < 10; ++i) { + writer.addRow(new MyRow(i, 2*i)); + } + writer.close(); + + writer = OrcFile.createWriter(new Path(mockPath + "/0_1"), + OrcFile.writerOptions(conf).blockPadding(false) + .bufferSize(1024).inspector(inspector)); + for(int i=0; i < 10; ++i) { + writer.addRow(new MyRow(i, 2*i)); + } + writer.close(); + + int readOpsBefore = -1; + for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { + if (statistics.getScheme().equalsIgnoreCase("mock")) { + readOpsBefore = statistics.getReadOps(); + } + } + assertTrue("MockFS has stats. Read ops not expected to be -1", readOpsBefore != -1); + OrcInputFormat orcInputFormat = new OrcInputFormat(); + InputSplit[] splits = orcInputFormat.getSplits(conf, 2); + int readOpsDelta = -1; + for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { + if (statistics.getScheme().equalsIgnoreCase("mock")) { + readOpsDelta = statistics.getReadOps() - readOpsBefore; + } + } + // call-1: listLocatedStatus - mock:/mocktable + // call-2: open - mock:/mocktable/0_0 + // call-3: open - mock:/mocktable/0_0 + assertEquals(3, readOpsDelta); + + assertEquals(2, splits.length); + // revert back to local fs + conf.set("fs.defaultFS", "file:///"); + } + + @Test public void testBIStrategySplitBlockBoundary() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); OrcInputFormat.Context context = new OrcInputFormat.Context(conf); @@ -788,6 +846,14 @@ public class TestInputOutputFormat { this.hosts = hosts; } + public void setOffset(int offset) { + this.offset = offset; + } + + public void setLength(int length) { + this.length = length; + } + @Override public String toString() { StringBuilder buffer = new StringBuilder(); @@ -926,6 +992,9 @@ public class TestInputOutputFormat { DataOutputBuffer buf = (DataOutputBuffer) getWrappedStream(); file.length = buf.getLength(); file.content = new byte[file.length]; + MockBlock block = new MockBlock("host1"); + block.setLength(file.length); + setBlocks(block); System.arraycopy(buf.getData(), 0, file.content, 0, file.length); } @@ -941,6 +1010,7 @@ public class TestInputOutputFormat { // statics for when the mock fs is created via FileSystem.get private static String blockedUgi = null; private final static List<MockFile> globalFiles = new ArrayList<MockFile>(); + protected Statistics statistics; public MockFileSystem() { // empty @@ -949,11 +1019,13 @@ public class TestInputOutputFormat { @Override public void initialize(URI uri, Configuration conf) { setConf(conf); + statistics = getStatistics("mock", getClass()); } public MockFileSystem(Configuration conf, MockFile... files) { setConf(conf); this.files.addAll(Arrays.asList(files)); + statistics = getStatistics("mock", getClass()); } public static void setBlockedUgi(String s) { @@ -979,6 +1051,7 @@ public class TestInputOutputFormat { @Override public FSDataInputStream open(Path path, int i) throws IOException { + statistics.incrementReadOps(1); checkAccess(); MockFile file = findFile(path); if (file != null) return new FSDataInputStream(new MockInputStream(file)); @@ -1011,6 +1084,7 @@ public class TestInputOutputFormat { short replication, long blockSize, Progressable progressable ) throws IOException { + statistics.incrementWriteOps(1); checkAccess(); MockFile file = findFile(path); if (file == null) { @@ -1024,6 +1098,7 @@ public class TestInputOutputFormat { public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable ) throws IOException { + statistics.incrementWriteOps(1); checkAccess(); return create(path, FsPermission.getDefault(), true, bufferSize, (short) 3, 256 * 1024, progressable); @@ -1031,24 +1106,68 @@ public class TestInputOutputFormat { @Override public boolean rename(Path path, Path path2) throws IOException { + statistics.incrementWriteOps(1); checkAccess(); return false; } @Override public boolean delete(Path path) throws IOException { + statistics.incrementWriteOps(1); checkAccess(); return false; } @Override public boolean delete(Path path, boolean b) throws IOException { + statistics.incrementWriteOps(1); checkAccess(); return false; } @Override + public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f) + throws IOException { + return new RemoteIterator<LocatedFileStatus>() { + private Iterator<LocatedFileStatus> iterator = listLocatedFileStatuses(f).iterator(); + + @Override + public boolean hasNext() throws IOException { + return iterator.hasNext(); + } + + @Override + public LocatedFileStatus next() throws IOException { + return iterator.next(); + } + }; + } + + private List<LocatedFileStatus> listLocatedFileStatuses(Path path) throws IOException { + statistics.incrementReadOps(1); + checkAccess(); + path = path.makeQualified(this); + List<LocatedFileStatus> result = new ArrayList<>(); + String pathname = path.toString(); + String pathnameAsDir = pathname + "/"; + Set<String> dirs = new TreeSet<String>(); + MockFile file = findFile(path); + if (file != null) { + result.add(createLocatedStatus(file)); + return result; + } + findMatchingLocatedFiles(files, pathnameAsDir, dirs, result); + findMatchingLocatedFiles(globalFiles, pathnameAsDir, dirs, result); + // for each directory add it once + for(String dir: dirs) { + result.add(createLocatedDirectory(new MockPath(this, pathnameAsDir + dir))); + } + return result; + } + + @Override public FileStatus[] listStatus(Path path) throws IOException { + statistics.incrementReadOps(1); checkAccess(); path = path.makeQualified(this); List<FileStatus> result = new ArrayList<FileStatus>(); @@ -1084,6 +1203,23 @@ public class TestInputOutputFormat { } } + private void findMatchingLocatedFiles( + List<MockFile> files, String pathnameAsDir, Set<String> dirs, List<LocatedFileStatus> result) + throws IOException { + for (MockFile file: files) { + String filename = file.path.toString(); + if (filename.startsWith(pathnameAsDir)) { + String tail = filename.substring(pathnameAsDir.length()); + int nextSlash = tail.indexOf('/'); + if (nextSlash > 0) { + dirs.add(tail.substring(0, nextSlash)); + } else { + result.add(createLocatedStatus(file)); + } + } + } + } + @Override public void setWorkingDirectory(Path path) { workingDir = path; @@ -1096,6 +1232,7 @@ public class TestInputOutputFormat { @Override public boolean mkdirs(Path path, FsPermission fsPermission) { + statistics.incrementWriteOps(1); return false; } @@ -1110,8 +1247,21 @@ public class TestInputOutputFormat { FsPermission.createImmutable((short) 755), "owen", "group", dir); } + private LocatedFileStatus createLocatedStatus(MockFile file) throws IOException { + FileStatus fileStatus = createStatus(file); + return new LocatedFileStatus(fileStatus, + getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false)); + } + + private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException { + FileStatus fileStatus = createDirectory(dir); + return new LocatedFileStatus(fileStatus, + getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false)); + } + @Override public FileStatus getFileStatus(Path path) throws IOException { + statistics.incrementReadOps(1); checkAccess(); path = path.makeQualified(this); String pathnameAsDir = path.toString() + "/"; @@ -1133,6 +1283,15 @@ public class TestInputOutputFormat { @Override public BlockLocation[] getFileBlockLocations(FileStatus stat, long start, long len) throws IOException { + return getFileBlockLocationsImpl(stat, start, len, true); + } + + private BlockLocation[] getFileBlockLocationsImpl(final FileStatus stat, final long start, + final long len, + final boolean updateStats) throws IOException { + if (updateStats) { + statistics.incrementReadOps(1); + } checkAccess(); List<BlockLocation> result = new ArrayList<BlockLocation>(); MockFile file = findFile(stat.getPath());
