HIVE-13120 : propagate doAs when generating ORC splits (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/be000060 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/be000060 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/be000060 Branch: refs/heads/llap Commit: be000060800101181e144e2e37a301039ebfd992 Parents: 50781c4 Author: Sergey Shelukhin <[email protected]> Authored: Thu Feb 25 15:27:36 2016 -0800 Committer: Sergey Shelukhin <[email protected]> Committed: Thu Feb 25 15:27:36 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 117 ++++++++-- .../hive/ql/io/orc/TestInputOutputFormat.java | 225 ++++++++++++++----- 2 files changed, 261 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/be000060/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index f36f707..d175d2d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -35,6 +36,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -103,6 +105,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.orc.OrcProto; @@ -539,7 +542,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private FooterCache footerCache; private static LocalCache localCache; private static MetastoreCache metaCache; - private static ExecutorService threadPool = null; + static ExecutorService threadPool = null; private final int numBuckets; private final int splitStrategyBatchMs; private final long maxSize; @@ -547,6 +550,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final int minSplits; private final boolean footerInSplits; private final boolean cacheStripeDetails; + private final boolean forceThreadpool; private final AtomicInteger cacheHitCounter = new AtomicInteger(0); private final AtomicInteger numFilesCounter = new AtomicInteger(0); private final ValidTxnList transactionList; @@ -559,6 +563,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, Context(Configuration conf, final int minSplits) { this.conf = conf; + this.forceThreadpool = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST); this.sarg = ConvertAstToSearchArg.createFromConf(conf); minSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, DEFAULT_MIN_SPLIT_SIZE); maxSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMAXSPLITSIZE, DEFAULT_MAX_SPLIT_SIZE); @@ -617,6 +622,21 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, Long.MAX_VALUE + ":"); transactionList = new ValidReadTxnList(value); } + + @VisibleForTesting + static int getCurrentThreadPoolSize() { + synchronized (Context.class) { + return (threadPool instanceof ThreadPoolExecutor) + ? ((ThreadPoolExecutor)threadPool).getPoolSize() : ((threadPool == null) ? 0 : -1); + } + } + + @VisibleForTesting + public static void resetThreadPool() { + synchronized (Context.class) { + threadPool = null; + } + } } /** @@ -706,10 +726,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, List<DeltaMetaData> deltas; boolean[] covered; private List<Future<List<OrcSplit>>> splitFuturesRef; + private final UserGroupInformation ugi; public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<HdfsFileStatusWithId> children, boolean isOriginal, List<DeltaMetaData> deltas, - boolean[] covered) { + boolean[] covered, UserGroupInformation ugi) { assert !children.isEmpty(); this.context = context; this.dirs = Lists.newArrayList(new ETLDir(dir, fs, children.size())); @@ -717,6 +738,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, this.isOriginal = isOriginal; this.deltas = deltas; this.covered = covered; + this.ugi = ugi; } @Override @@ -803,26 +825,42 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, public Future<Void> generateSplitWork( Context context, List<Future<List<OrcSplit>>> splitFutures) throws IOException { - if (context.cacheStripeDetails && context.footerCache.isBlocking()) { + if ((context.cacheStripeDetails && context.footerCache.isBlocking()) + || context.forceThreadpool) { this.splitFuturesRef = splitFutures; return Context.threadPool.submit(this); } else { - runGetSplitsSync(splitFutures); + runGetSplitsSync(splitFutures, null); return null; } } @Override public Void call() throws IOException { - runGetSplitsSync(splitFuturesRef); - return null; + if (ugi == null) { + runGetSplitsSync(splitFuturesRef, null); + return null; + } + try { + return ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + runGetSplitsSync(splitFuturesRef, ugi); + return null; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } } - private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures) throws IOException { + private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures, + UserGroupInformation ugi) throws IOException { List<SplitInfo> splits = getSplits(); List<Future<List<OrcSplit>>> localList = new ArrayList<>(splits.size()); + UserGroupInformation tpUgi = ugi == null ? UserGroupInformation.getCurrentUser() : ugi; for (SplitInfo splitInfo : splits) { - localList.add(Context.threadPool.submit(new SplitGenerator(splitInfo))); + localList.add(Context.threadPool.submit(new SplitGenerator(splitInfo, tpUgi))); } synchronized (splitFutures) { splitFutures.addAll(localList); @@ -924,16 +962,35 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final FileSystem fs; private final Path dir; private final boolean useFileIds; + private final UserGroupInformation ugi; - FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds) { + FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds, + UserGroupInformation ugi) { this.context = context; this.fs = fs; this.dir = dir; this.useFileIds = useFileIds; + this.ugi = ugi; } @Override public AcidDirInfo call() throws IOException { + if (ugi == null) { + return callInternal(); + } + try { + return ugi.doAs(new PrivilegedExceptionAction<AcidDirInfo>() { + @Override + public AcidDirInfo run() throws Exception { + return callInternal(); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private AcidDirInfo callInternal() throws IOException { AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, context.transactionList, useFileIds); Path base = dirInfo.getBaseDirectory(); @@ -986,8 +1043,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private OrcFile.WriterVersion writerVersion; private long projColsUncompressedSize; private final List<OrcSplit> deltaSplits; + private final UserGroupInformation ugi; - public SplitGenerator(SplitInfo splitInfo) throws IOException { + public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi) throws IOException { + this.ugi = ugi; this.context = splitInfo.context; this.fs = splitInfo.fs; this.fileWithId = splitInfo.fileWithId; @@ -1110,6 +1169,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, */ @Override public List<OrcSplit> call() throws IOException { + if (ugi == null) { + return callInternal(); + } + try { + return ugi.doAs(new PrivilegedExceptionAction<List<OrcSplit>>() { + @Override + public List<OrcSplit> run() throws Exception { + return callInternal(); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private List<OrcSplit> callInternal() throws IOException { populateAndCacheStripeDetails(); List<OrcSplit> splits = Lists.newArrayList(); @@ -1265,13 +1340,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList(); List<Future<Void>> strategyFutures = Lists.newArrayList(); final List<Future<List<OrcSplit>>> splitFutures = Lists.newArrayList(); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); // multi-threaded file statuses and split strategy Path[] paths = getInputPaths(conf); CompletionService<AcidDirInfo> ecs = new ExecutorCompletionService<>(Context.threadPool); for (Path dir : paths) { FileSystem fs = dir.getFileSystem(conf); - FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds); + FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds, ugi); pathFutures.add(ecs.submit(fileGenerator)); } @@ -1303,7 +1379,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, // We have received a new directory information, make a split strategy. --resultsLeft; SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, context, - adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles); + adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, ugi); if (splitStrategy == null) continue; // Combined. if (isDebugEnabled) { @@ -1372,12 +1448,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private static SplitStrategy<?> combineOrCreateETLStrategy(CombinedCtx combinedCtx, Context context, FileSystem fs, Path dir, List<HdfsFileStatusWithId> files, - List<DeltaMetaData> deltas, boolean[] covered, boolean isOriginal) { + List<DeltaMetaData> deltas, boolean[] covered, boolean isOriginal, + UserGroupInformation ugi) { if (!deltas.isEmpty() || combinedCtx == null) { - return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered); + return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered, ugi); } else if (combinedCtx.combined == null) { combinedCtx.combined = new ETLSplitStrategy( - context, fs, dir, files, isOriginal, deltas, covered); + context, fs, dir, files, isOriginal, deltas, covered, ugi); combinedCtx.combineStartUs = System.nanoTime(); return null; } else { @@ -1386,11 +1463,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, switch (r) { case YES: return null; case NO_AND_CONTINUE: - return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered); + return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered, ugi); case NO_AND_SWAP: { ETLSplitStrategy oldBase = combinedCtx.combined; combinedCtx.combined = new ETLSplitStrategy( - context, fs, dir, files, isOriginal, deltas, covered); + context, fs, dir, files, isOriginal, deltas, covered, ugi); combinedCtx.combineStartUs = System.nanoTime(); return oldBase; } @@ -1719,7 +1796,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, @VisibleForTesting static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context, FileSystem fs, Path dir, AcidUtils.Directory dirInfo, - List<HdfsFileStatusWithId> baseOrOriginalFiles) { + List<HdfsFileStatusWithId> baseOrOriginalFiles, UserGroupInformation ugi) { Path base = dirInfo.getBaseDirectory(); List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles(); List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); @@ -1752,12 +1829,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, case ETL: // ETL strategy requested through config return combineOrCreateETLStrategy(combinedCtx, context, fs, - dir, baseOrOriginalFiles, deltas, covered, isOriginal); + dir, baseOrOriginalFiles, deltas, covered, isOriginal, ugi); default: // HYBRID strategy if (avgFileSize > context.maxSize || totalFiles <= context.minSplits) { return combineOrCreateETLStrategy(combinedCtx, context, fs, - dir, baseOrOriginalFiles, deltas, covered, isOriginal); + dir, baseOrOriginalFiles, deltas, covered, isOriginal, ugi); } else { return new BISplitStrategy( context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered); http://git-wip-us.apache.org/repos/asf/hive/blob/be000060/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 9b1d7ae..6f84708 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -17,13 +17,7 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.DataInput; import java.io.DataOutput; @@ -31,6 +25,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; import java.sql.Date; import java.sql.Timestamp; import java.text.SimpleDateFormat; @@ -56,6 +51,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -109,18 +105,22 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.orc.OrcProto; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Output; @SuppressWarnings({ "deprecation", "unchecked", "rawtypes" }) public class TestInputOutputFormat { + private static final Logger LOG = LoggerFactory.getLogger(TestInputOutputFormat.class); public static String toKryo(SearchArgument sarg) { Output out = new Output(4 * 1024, 10 * 1024 * 1024); @@ -512,7 +512,7 @@ public class TestInputOutputFormat { final OrcInputFormat.Context context = new OrcInputFormat.Context( conf, n); OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator( - context, fs, new MockPath(fs, "mock:/a/b"), false); + context, fs, new MockPath(fs, "mock:/a/b"), false, null); final SplitStrategy splitStrategy = createSplitStrategy(context, gen); assertTrue( String.format( @@ -536,7 +536,7 @@ public class TestInputOutputFormat { new MockFile("mock:/a/b/part-04", 1000, new byte[0])); OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, - new MockPath(fs, "mock:/a/b"), false); + new MockPath(fs, "mock:/a/b"), false, null); OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen); assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); @@ -549,7 +549,7 @@ public class TestInputOutputFormat { new MockFile("mock:/a/b/.part-03", 1000, new byte[1000]), new MockFile("mock:/a/b/part-04", 1000, new byte[1000])); gen = new OrcInputFormat.FileGenerator(context, fs, - new MockPath(fs, "mock:/a/b"), false); + new MockPath(fs, "mock:/a/b"), false, null); splitStrategy = createSplitStrategy(context, gen); assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy); } @@ -623,19 +623,20 @@ public class TestInputOutputFormat { MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException { OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path); return OrcInputFormat.determineSplitStrategy( - combineCtx, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles); + combineCtx, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null); } public OrcInputFormat.AcidDirInfo createAdi( OrcInputFormat.Context context, MockFileSystem fs, String path) throws IOException { - return new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, path), false).call(); + return new OrcInputFormat.FileGenerator( + context, fs, new MockPath(fs, path), false, null).call(); } private OrcInputFormat.SplitStrategy createSplitStrategy( OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException { OrcInputFormat.AcidDirInfo adi = gen.call(); return OrcInputFormat.determineSplitStrategy( - null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles); + null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null); } public static class MockBlock { @@ -797,6 +798,9 @@ public class TestInputOutputFormat { public static class MockFileSystem extends FileSystem { final List<MockFile> files = new ArrayList<MockFile>(); Path workingDir = new Path("/"); + // statics for when the mock fs is created via FileSystem.get + private static String blockedUgi = null; + private final static List<MockFile> globalFiles = new ArrayList<MockFile>(); public MockFileSystem() { // empty @@ -812,6 +816,10 @@ public class TestInputOutputFormat { this.files.addAll(Arrays.asList(files)); } + public static void setBlockedUgi(String s) { + blockedUgi = s; + } + void clear() { files.clear(); } @@ -825,14 +833,36 @@ public class TestInputOutputFormat { } } + @SuppressWarnings("serial") + public static class MockAccessDenied extends IOException { + } + @Override public FSDataInputStream open(Path path, int i) throws IOException { - for(MockFile file: files) { + checkAccess(); + MockFile file = findFile(path); + if (file != null) return new FSDataInputStream(new MockInputStream(file)); + throw new IOException("File not found: " + path); + } + + private MockFile findFile(Path path) { + for (MockFile file: files) { if (file.path.equals(path)) { - return new FSDataInputStream(new MockInputStream(file)); + return file; } } - throw new IOException("File not found: " + path); + for (MockFile file: globalFiles) { + if (file.path.equals(path)) { + return file; + } + } + return null; + } + + private void checkAccess() throws IOException { + if (blockedUgi == null) return; + if (!blockedUgi.equals(UserGroupInformation.getCurrentUser().getShortUserName())) return; + throw new MockAccessDenied(); } @Override @@ -841,13 +871,8 @@ public class TestInputOutputFormat { short replication, long blockSize, Progressable progressable ) throws IOException { - MockFile file = null; - for(MockFile currentFile: files) { - if (currentFile.path.equals(path)) { - file = currentFile; - break; - } - } + checkAccess(); + MockFile file = findFile(path); if (file == null) { file = new MockFile(path.toString(), (int) blockSize, new byte[0]); files.add(file); @@ -859,37 +884,55 @@ public class TestInputOutputFormat { public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable ) throws IOException { + checkAccess(); return create(path, FsPermission.getDefault(), true, bufferSize, (short) 3, 256 * 1024, progressable); } @Override public boolean rename(Path path, Path path2) throws IOException { + checkAccess(); return false; } @Override public boolean delete(Path path) throws IOException { + checkAccess(); return false; } @Override public boolean delete(Path path, boolean b) throws IOException { + checkAccess(); return false; } @Override public FileStatus[] listStatus(Path path) throws IOException { + checkAccess(); path = path.makeQualified(this); List<FileStatus> result = new ArrayList<FileStatus>(); String pathname = path.toString(); String pathnameAsDir = pathname + "/"; Set<String> dirs = new TreeSet<String>(); - for(MockFile file: files) { + MockFile file = findFile(path); + if (file != null) { + return new FileStatus[]{createStatus(file)}; + } + findMatchingFiles(files, pathnameAsDir, dirs, result); + findMatchingFiles(globalFiles, pathnameAsDir, dirs, result); + // for each directory add it once + for(String dir: dirs) { + result.add(createDirectory(new MockPath(this, pathnameAsDir + dir))); + } + return result.toArray(new FileStatus[result.size()]); + } + + private void findMatchingFiles( + List<MockFile> files, String pathnameAsDir, Set<String> dirs, List<FileStatus> result) { + for (MockFile file: files) { String filename = file.path.toString(); - if (pathname.equals(filename)) { - return new FileStatus[]{createStatus(file)}; - } else if (filename.startsWith(pathnameAsDir)) { + if (filename.startsWith(pathnameAsDir)) { String tail = filename.substring(pathnameAsDir.length()); int nextSlash = tail.indexOf('/'); if (nextSlash > 0) { @@ -899,11 +942,6 @@ public class TestInputOutputFormat { } } } - // for each directory add it once - for(String dir: dirs) { - result.add(createDirectory(new MockPath(this, pathnameAsDir + dir))); - } - return result.toArray(new FileStatus[result.size()]); } @Override @@ -934,12 +972,18 @@ public class TestInputOutputFormat { @Override public FileStatus getFileStatus(Path path) throws IOException { + checkAccess(); path = path.makeQualified(this); String pathnameAsDir = path.toString() + "/"; - for(MockFile file: files) { - if (file.path.equals(path)) { - return createStatus(file); - } else if (file.path.toString().startsWith(pathnameAsDir)) { + MockFile file = findFile(path); + if (file != null) return createStatus(file); + for (MockFile dir : files) { + if (dir.path.toString().startsWith(pathnameAsDir)) { + return createDirectory(path); + } + } + for (MockFile dir : globalFiles) { + if (dir.path.toString().startsWith(pathnameAsDir)) { return createDirectory(path); } } @@ -948,23 +992,23 @@ public class TestInputOutputFormat { @Override public BlockLocation[] getFileBlockLocations(FileStatus stat, - long start, long len) { + long start, long len) throws IOException { + checkAccess(); List<BlockLocation> result = new ArrayList<BlockLocation>(); - for(MockFile file: files) { - if (file.path.equals(stat.getPath())) { - for(MockBlock block: file.blocks) { - if (OrcInputFormat.SplitGenerator.getOverlap(block.offset, - block.length, start, len) > 0) { - String[] topology = new String[block.hosts.length]; - for(int i=0; i < topology.length; ++i) { - topology[i] = "/rack/ " + block.hosts[i]; - } - result.add(new BlockLocation(block.hosts, block.hosts, - topology, block.offset, block.length)); + MockFile file = findFile(stat.getPath()); + if (file != null) { + for(MockBlock block: file.blocks) { + if (OrcInputFormat.SplitGenerator.getOverlap(block.offset, + block.length, start, len) > 0) { + String[] topology = new String[block.hosts.length]; + for(int i=0; i < topology.length; ++i) { + topology[i] = "/rack/ " + block.hosts[i]; } + result.add(new BlockLocation(block.hosts, block.hosts, + topology, block.offset, block.length)); } - return result.toArray(new BlockLocation[result.size()]); } + return result.toArray(new BlockLocation[result.size()]); } return new BlockLocation[0]; } @@ -982,6 +1026,14 @@ public class TestInputOutputFormat { buffer.append("]}"); return buffer.toString(); } + + public static void addGlobalFile(MockFile mockFile) { + globalFiles.add(mockFile); + } + + public static void clearGlobalFiles() { + globalFiles.clear(); + } } static void fill(DataOutputBuffer out, long length) throws IOException { @@ -1062,7 +1114,7 @@ public class TestInputOutputFormat { OrcInputFormat.SplitGenerator splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null, true, - new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null)); + new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null); OrcSplit result = splitter.createSplit(0, 200, null); assertEquals(0, result.getStart()); assertEquals(200, result.getLength()); @@ -1103,7 +1155,7 @@ public class TestInputOutputFormat { OrcInputFormat.SplitGenerator splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null, true, - new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null)); + new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null); List<OrcSplit> results = splitter.call(); OrcSplit result = results.get(0); assertEquals(3, result.getStart()); @@ -1126,7 +1178,7 @@ public class TestInputOutputFormat { context = new OrcInputFormat.Context(conf); splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null, true, - new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null)); + new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null); results = splitter.call(); for(int i=0; i < stripeSizes.length; ++i) { assertEquals("checking stripe " + i + " size", @@ -1154,7 +1206,7 @@ public class TestInputOutputFormat { OrcInputFormat.SplitGenerator splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, - new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null)); + new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null); List<OrcSplit> results = splitter.call(); OrcSplit result = results.get(0); assertEquals(3, results.size()); @@ -1177,7 +1229,7 @@ public class TestInputOutputFormat { splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<AcidInputFormat.DeltaMetaData>(), - true, null, null)); + true, null, null), null); results = splitter.call(); assertEquals(5, results.size()); for (int i = 0; i < stripeSizes.length; ++i) { @@ -1197,7 +1249,7 @@ public class TestInputOutputFormat { splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<AcidInputFormat.DeltaMetaData>(), - true, null, null)); + true, null, null), null); results = splitter.call(); assertEquals(1, results.size()); result = results.get(0); @@ -2056,12 +2108,7 @@ public class TestInputOutputFormat { @Test public void testSplitEliminationNullStats() throws Exception { Properties properties = new Properties(); - StructObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = (StructObjectInspector) - ObjectInspectorFactory.getReflectionObjectInspector(SimpleRow.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } + StructObjectInspector inspector = createSoi(); SerDe serde = new OrcSerde(); OutputFormat<?, ?> outFormat = new OrcOutputFormat(); conf.setInt("mapred.max.split.size", 50); @@ -2094,4 +2141,60 @@ public class TestInputOutputFormat { assertEquals(0, splits.length); } + @Test + public void testDoAs() throws Exception { + conf.setInt(ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS.varname, 1); + conf.set(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL"); + conf.setBoolean(ConfVars.HIVE_IN_TEST.varname, true); + conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); + String badUser = UserGroupInformation.getCurrentUser().getShortUserName() + "-foo"; + MockFileSystem.setBlockedUgi(badUser); + MockFileSystem.clearGlobalFiles(); + OrcInputFormat.Context.resetThreadPool(); // We need the size above to take effect. + try { + // OrcInputFormat will get a mock fs from FileSystem.get; add global files. + MockFileSystem.addGlobalFile(new MockFile("mock:/ugi/1/file", 10000, + createMockOrcFile(197, 300, 600), new MockBlock("host1-1", "host1-2", "host1-3"))); + MockFileSystem.addGlobalFile(new MockFile("mock:/ugi/2/file", 10000, + createMockOrcFile(197, 300, 600), new MockBlock("host1-1", "host1-2", "host1-3"))); + FileInputFormat.setInputPaths(conf, "mock:/ugi/1"); + UserGroupInformation ugi = UserGroupInformation.createUserForTesting(badUser, new String[0]); + assertEquals(0, OrcInputFormat.Context.getCurrentThreadPoolSize()); + try { + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + OrcInputFormat.generateSplitsInfo(conf, -1); + return null; + } + }); + fail("Didn't throw"); + } catch (Exception ex) { + Throwable cause = ex; + boolean found = false; + while (cause != null) { + if (cause instanceof MockFileSystem.MockAccessDenied) { + found = true; // Expected. + break; + } + cause = cause.getCause(); + } + if (!found) throw ex; // Unexpected. + } + assertEquals(1, OrcInputFormat.Context.getCurrentThreadPoolSize()); + FileInputFormat.setInputPaths(conf, "mock:/ugi/2"); + List<OrcSplit> splits = OrcInputFormat.generateSplitsInfo(conf, -1); + assertEquals(1, splits.size()); + } finally { + MockFileSystem.clearGlobalFiles(); + } + } + + + private StructObjectInspector createSoi() { + synchronized (TestOrcFile.class) { + return (StructObjectInspector)ObjectInspectorFactory.getReflectionObjectInspector( + SimpleRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + } }
