HIVE-13120 : propagate doAs when generating ORC splits (Sergey Shelukhin, 
reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/be000060
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/be000060
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/be000060

Branch: refs/heads/llap
Commit: be000060800101181e144e2e37a301039ebfd992
Parents: 50781c4
Author: Sergey Shelukhin <[email protected]>
Authored: Thu Feb 25 15:27:36 2016 -0800
Committer: Sergey Shelukhin <[email protected]>
Committed: Thu Feb 25 15:27:36 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 117 ++++++++--
 .../hive/ql/io/orc/TestInputOutputFormat.java   | 225 ++++++++++++++-----
 2 files changed, 261 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/be000060/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index f36f707..d175d2d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -35,6 +36,7 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -103,6 +105,7 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.orc.OrcProto;
 
@@ -539,7 +542,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     private FooterCache footerCache;
     private static LocalCache localCache;
     private static MetastoreCache metaCache;
-    private static ExecutorService threadPool = null;
+    static ExecutorService threadPool = null;
     private final int numBuckets;
     private final int splitStrategyBatchMs;
     private final long maxSize;
@@ -547,6 +550,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     private final int minSplits;
     private final boolean footerInSplits;
     private final boolean cacheStripeDetails;
+    private final boolean forceThreadpool;
     private final AtomicInteger cacheHitCounter = new AtomicInteger(0);
     private final AtomicInteger numFilesCounter = new AtomicInteger(0);
     private final ValidTxnList transactionList;
@@ -559,6 +563,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
 
     Context(Configuration conf, final int minSplits) {
       this.conf = conf;
+      this.forceThreadpool = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
       this.sarg = ConvertAstToSearchArg.createFromConf(conf);
       minSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, 
DEFAULT_MIN_SPLIT_SIZE);
       maxSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMAXSPLITSIZE, 
DEFAULT_MAX_SPLIT_SIZE);
@@ -617,6 +622,21 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
                               Long.MAX_VALUE + ":");
       transactionList = new ValidReadTxnList(value);
     }
+
+    @VisibleForTesting
+    static int getCurrentThreadPoolSize() {
+      synchronized (Context.class) {
+        return (threadPool instanceof ThreadPoolExecutor)
+            ? ((ThreadPoolExecutor)threadPool).getPoolSize() : ((threadPool == 
null) ? 0 : -1);
+      }
+    }
+
+    @VisibleForTesting
+    public static void resetThreadPool() {
+      synchronized (Context.class) {
+        threadPool = null;
+      }
+    }
   }
 
   /**
@@ -706,10 +726,11 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     List<DeltaMetaData> deltas;
     boolean[] covered;
     private List<Future<List<OrcSplit>>> splitFuturesRef;
+    private final UserGroupInformation ugi;
 
     public ETLSplitStrategy(Context context, FileSystem fs, Path dir,
         List<HdfsFileStatusWithId> children, boolean isOriginal, 
List<DeltaMetaData> deltas,
-        boolean[] covered) {
+        boolean[] covered, UserGroupInformation ugi) {
       assert !children.isEmpty();
       this.context = context;
       this.dirs = Lists.newArrayList(new ETLDir(dir, fs, children.size()));
@@ -717,6 +738,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       this.isOriginal = isOriginal;
       this.deltas = deltas;
       this.covered = covered;
+      this.ugi = ugi;
     }
 
     @Override
@@ -803,26 +825,42 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
 
     public Future<Void> generateSplitWork(
         Context context, List<Future<List<OrcSplit>>> splitFutures) throws 
IOException {
-      if (context.cacheStripeDetails && context.footerCache.isBlocking()) {
+      if ((context.cacheStripeDetails && context.footerCache.isBlocking())
+          || context.forceThreadpool) {
         this.splitFuturesRef = splitFutures;
         return Context.threadPool.submit(this);
       } else {
-        runGetSplitsSync(splitFutures);
+        runGetSplitsSync(splitFutures, null);
         return null;
       }
     }
 
     @Override
     public Void call() throws IOException {
-      runGetSplitsSync(splitFuturesRef);
-      return null;
+      if (ugi == null) {
+        runGetSplitsSync(splitFuturesRef, null);
+        return null;
+      }
+      try {
+        return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            runGetSplitsSync(splitFuturesRef, ugi);
+            return null;
+          }
+        });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
     }
 
-    private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures) 
throws IOException {
+    private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures,
+        UserGroupInformation ugi) throws IOException {
       List<SplitInfo> splits = getSplits();
       List<Future<List<OrcSplit>>> localList = new ArrayList<>(splits.size());
+      UserGroupInformation tpUgi = ugi == null ? 
UserGroupInformation.getCurrentUser() : ugi;
       for (SplitInfo splitInfo : splits) {
-        localList.add(Context.threadPool.submit(new 
SplitGenerator(splitInfo)));
+        localList.add(Context.threadPool.submit(new SplitGenerator(splitInfo, 
tpUgi)));
       }
       synchronized (splitFutures) {
         splitFutures.addAll(localList);
@@ -924,16 +962,35 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     private final FileSystem fs;
     private final Path dir;
     private final boolean useFileIds;
+    private final UserGroupInformation ugi;
 
-    FileGenerator(Context context, FileSystem fs, Path dir, boolean 
useFileIds) {
+    FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds,
+        UserGroupInformation ugi) {
       this.context = context;
       this.fs = fs;
       this.dir = dir;
       this.useFileIds = useFileIds;
+      this.ugi = ugi;
     }
 
     @Override
     public AcidDirInfo call() throws IOException {
+      if (ugi == null) {
+        return callInternal();
+      }
+      try {
+        return ugi.doAs(new PrivilegedExceptionAction<AcidDirInfo>() {
+          @Override
+          public AcidDirInfo run() throws Exception {
+            return callInternal();
+          }
+        });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    private AcidDirInfo callInternal() throws IOException {
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
           context.conf, context.transactionList, useFileIds);
       Path base = dirInfo.getBaseDirectory();
@@ -986,8 +1043,10 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     private OrcFile.WriterVersion writerVersion;
     private long projColsUncompressedSize;
     private final List<OrcSplit> deltaSplits;
+    private final UserGroupInformation ugi;
 
-    public SplitGenerator(SplitInfo splitInfo) throws IOException {
+    public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi) 
throws IOException {
+      this.ugi = ugi;
       this.context = splitInfo.context;
       this.fs = splitInfo.fs;
       this.fileWithId = splitInfo.fileWithId;
@@ -1110,6 +1169,22 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
      */
     @Override
     public List<OrcSplit> call() throws IOException {
+      if (ugi == null) {
+        return callInternal();
+      }
+      try {
+        return ugi.doAs(new PrivilegedExceptionAction<List<OrcSplit>>() {
+          @Override
+          public List<OrcSplit> run() throws Exception {
+            return callInternal();
+          }
+        });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    private List<OrcSplit> callInternal() throws IOException {
       populateAndCacheStripeDetails();
       List<OrcSplit> splits = Lists.newArrayList();
 
@@ -1265,13 +1340,14 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
     List<Future<Void>> strategyFutures = Lists.newArrayList();
     final List<Future<List<OrcSplit>>> splitFutures = Lists.newArrayList();
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
     // multi-threaded file statuses and split strategy
     Path[] paths = getInputPaths(conf);
     CompletionService<AcidDirInfo> ecs = new 
ExecutorCompletionService<>(Context.threadPool);
     for (Path dir : paths) {
       FileSystem fs = dir.getFileSystem(conf);
-      FileGenerator fileGenerator = new FileGenerator(context, fs, dir, 
useFileIds);
+      FileGenerator fileGenerator = new FileGenerator(context, fs, dir, 
useFileIds, ugi);
       pathFutures.add(ecs.submit(fileGenerator));
     }
 
@@ -1303,7 +1379,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         // We have received a new directory information, make a split strategy.
         --resultsLeft;
         SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, 
context,
-            adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
+            adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, ugi);
         if (splitStrategy == null) continue; // Combined.
 
         if (isDebugEnabled) {
@@ -1372,12 +1448,13 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
 
   private static SplitStrategy<?> combineOrCreateETLStrategy(CombinedCtx 
combinedCtx,
       Context context, FileSystem fs, Path dir, List<HdfsFileStatusWithId> 
files,
-      List<DeltaMetaData> deltas, boolean[] covered, boolean isOriginal) {
+      List<DeltaMetaData> deltas, boolean[] covered, boolean isOriginal,
+      UserGroupInformation ugi) {
     if (!deltas.isEmpty() || combinedCtx == null) {
-      return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, 
covered);
+      return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, 
covered, ugi);
     } else if (combinedCtx.combined == null) {
       combinedCtx.combined = new ETLSplitStrategy(
-          context, fs, dir, files, isOriginal, deltas, covered);
+          context, fs, dir, files, isOriginal, deltas, covered, ugi);
       combinedCtx.combineStartUs = System.nanoTime();
       return null;
     } else {
@@ -1386,11 +1463,11 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       switch (r) {
       case YES: return null;
       case NO_AND_CONTINUE:
-        return new ETLSplitStrategy(context, fs, dir, files, isOriginal, 
deltas, covered);
+        return new ETLSplitStrategy(context, fs, dir, files, isOriginal, 
deltas, covered, ugi);
       case NO_AND_SWAP: {
         ETLSplitStrategy oldBase = combinedCtx.combined;
         combinedCtx.combined = new ETLSplitStrategy(
-            context, fs, dir, files, isOriginal, deltas, covered);
+            context, fs, dir, files, isOriginal, deltas, covered, ugi);
         combinedCtx.combineStartUs = System.nanoTime();
         return oldBase;
       }
@@ -1719,7 +1796,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
   @VisibleForTesting
   static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, 
Context context,
       FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
-      List<HdfsFileStatusWithId> baseOrOriginalFiles) {
+      List<HdfsFileStatusWithId> baseOrOriginalFiles, UserGroupInformation 
ugi) {
     Path base = dirInfo.getBaseDirectory();
     List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
     List<DeltaMetaData> deltas = 
AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
@@ -1752,12 +1829,12 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         case ETL:
           // ETL strategy requested through config
           return combineOrCreateETLStrategy(combinedCtx, context, fs,
-            dir, baseOrOriginalFiles, deltas, covered, isOriginal);
+            dir, baseOrOriginalFiles, deltas, covered, isOriginal, ugi);
         default:
           // HYBRID strategy
           if (avgFileSize > context.maxSize || totalFiles <= 
context.minSplits) {
             return combineOrCreateETLStrategy(combinedCtx, context, fs,
-                dir, baseOrOriginalFiles, deltas, covered, isOriginal);
+                dir, baseOrOriginalFiles, deltas, covered, isOriginal, ugi);
           } else {
             return new BISplitStrategy(
                 context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, 
covered);

http://git-wip-us.apache.org/repos/asf/hive/blob/be000060/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java 
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 9b1d7ae..6f84708 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -17,13 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -31,6 +25,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
@@ -56,6 +51,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -109,18 +105,22 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 import org.apache.orc.OrcProto;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Output;
 
 @SuppressWarnings({ "deprecation", "unchecked", "rawtypes" })
 public class TestInputOutputFormat {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestInputOutputFormat.class);
 
   public static String toKryo(SearchArgument sarg) {
     Output out = new Output(4 * 1024, 10 * 1024 * 1024);
@@ -512,7 +512,7 @@ public class TestInputOutputFormat {
           final OrcInputFormat.Context context = new OrcInputFormat.Context(
               conf, n);
           OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
-              context, fs, new MockPath(fs, "mock:/a/b"), false);
+              context, fs, new MockPath(fs, "mock:/a/b"), false, null);
           final SplitStrategy splitStrategy = createSplitStrategy(context, 
gen);
           assertTrue(
               String.format(
@@ -536,7 +536,7 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/part-04", 1000, new byte[0]));
     OrcInputFormat.FileGenerator gen =
       new OrcInputFormat.FileGenerator(context, fs,
-          new MockPath(fs, "mock:/a/b"), false);
+          new MockPath(fs, "mock:/a/b"), false, null);
     OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, 
gen);
     assertEquals(true, splitStrategy instanceof 
OrcInputFormat.BISplitStrategy);
 
@@ -549,7 +549,7 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/.part-03", 1000, new byte[1000]),
         new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
     gen = new OrcInputFormat.FileGenerator(context, fs,
-            new MockPath(fs, "mock:/a/b"), false);
+            new MockPath(fs, "mock:/a/b"), false, null);
     splitStrategy = createSplitStrategy(context, gen);
     assertEquals(true, splitStrategy instanceof 
OrcInputFormat.ETLSplitStrategy);
   }
@@ -623,19 +623,20 @@ public class TestInputOutputFormat {
       MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) 
throws IOException {
     OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path);
     return OrcInputFormat.determineSplitStrategy(
-        combineCtx, context, adi.fs, adi.splitPath, adi.acidInfo, 
adi.baseOrOriginalFiles);
+        combineCtx, context, adi.fs, adi.splitPath, adi.acidInfo, 
adi.baseOrOriginalFiles, null);
   }
 
   public OrcInputFormat.AcidDirInfo createAdi(
       OrcInputFormat.Context context, MockFileSystem fs, String path) throws 
IOException {
-    return new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, 
path), false).call();
+    return new OrcInputFormat.FileGenerator(
+        context, fs, new MockPath(fs, path), false, null).call();
   }
 
   private OrcInputFormat.SplitStrategy createSplitStrategy(
       OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws 
IOException {
     OrcInputFormat.AcidDirInfo adi = gen.call();
     return OrcInputFormat.determineSplitStrategy(
-        null, context, adi.fs, adi.splitPath, adi.acidInfo, 
adi.baseOrOriginalFiles);
+        null, context, adi.fs, adi.splitPath, adi.acidInfo, 
adi.baseOrOriginalFiles, null);
   }
 
   public static class MockBlock {
@@ -797,6 +798,9 @@ public class TestInputOutputFormat {
   public static class MockFileSystem extends FileSystem {
     final List<MockFile> files = new ArrayList<MockFile>();
     Path workingDir = new Path("/");
+    // statics for when the mock fs is created via FileSystem.get
+    private static String blockedUgi = null;
+    private final static List<MockFile> globalFiles = new 
ArrayList<MockFile>();
 
     public MockFileSystem() {
       // empty
@@ -812,6 +816,10 @@ public class TestInputOutputFormat {
       this.files.addAll(Arrays.asList(files));
     }
 
+    public static void setBlockedUgi(String s) {
+      blockedUgi = s;
+    }
+
     void clear() {
       files.clear();
     }
@@ -825,14 +833,36 @@ public class TestInputOutputFormat {
       }
     }
 
+    @SuppressWarnings("serial")
+    public static class MockAccessDenied extends IOException {
+    }
+
     @Override
     public FSDataInputStream open(Path path, int i) throws IOException {
-      for(MockFile file: files) {
+      checkAccess();
+      MockFile file = findFile(path);
+      if (file != null) return new FSDataInputStream(new 
MockInputStream(file));
+      throw new IOException("File not found: " + path);
+    }
+
+    private MockFile findFile(Path path) {
+      for (MockFile file: files) {
         if (file.path.equals(path)) {
-          return new FSDataInputStream(new MockInputStream(file));
+          return file;
         }
       }
-      throw new IOException("File not found: " + path);
+      for (MockFile file: globalFiles) {
+        if (file.path.equals(path)) {
+          return file;
+        }
+      }
+      return null;
+    }
+
+    private void checkAccess() throws IOException {
+      if (blockedUgi == null) return;
+      if 
(!blockedUgi.equals(UserGroupInformation.getCurrentUser().getShortUserName())) 
return;
+      throw new MockAccessDenied();
     }
 
     @Override
@@ -841,13 +871,8 @@ public class TestInputOutputFormat {
                                      short replication, long blockSize,
                                      Progressable progressable
                                      ) throws IOException {
-      MockFile file = null;
-      for(MockFile currentFile: files) {
-        if (currentFile.path.equals(path)) {
-          file = currentFile;
-          break;
-        }
-      }
+      checkAccess();
+      MockFile file = findFile(path);
       if (file == null) {
         file = new MockFile(path.toString(), (int) blockSize, new byte[0]);
         files.add(file);
@@ -859,37 +884,55 @@ public class TestInputOutputFormat {
     public FSDataOutputStream append(Path path, int bufferSize,
                                      Progressable progressable
                                      ) throws IOException {
+      checkAccess();
       return create(path, FsPermission.getDefault(), true, bufferSize,
           (short) 3, 256 * 1024, progressable);
     }
 
     @Override
     public boolean rename(Path path, Path path2) throws IOException {
+      checkAccess();
       return false;
     }
 
     @Override
     public boolean delete(Path path) throws IOException {
+      checkAccess();
       return false;
     }
 
     @Override
     public boolean delete(Path path, boolean b) throws IOException {
+      checkAccess();
       return false;
     }
 
     @Override
     public FileStatus[] listStatus(Path path) throws IOException {
+      checkAccess();
       path = path.makeQualified(this);
       List<FileStatus> result = new ArrayList<FileStatus>();
       String pathname = path.toString();
       String pathnameAsDir = pathname + "/";
       Set<String> dirs = new TreeSet<String>();
-      for(MockFile file: files) {
+      MockFile file = findFile(path);
+      if (file != null) {
+        return new FileStatus[]{createStatus(file)};
+      }
+      findMatchingFiles(files, pathnameAsDir, dirs, result);
+      findMatchingFiles(globalFiles, pathnameAsDir, dirs, result);
+      // for each directory add it once
+      for(String dir: dirs) {
+        result.add(createDirectory(new MockPath(this, pathnameAsDir + dir)));
+      }
+      return result.toArray(new FileStatus[result.size()]);
+    }
+
+    private void findMatchingFiles(
+        List<MockFile> files, String pathnameAsDir, Set<String> dirs, 
List<FileStatus> result) {
+      for (MockFile file: files) {
         String filename = file.path.toString();
-        if (pathname.equals(filename)) {
-          return new FileStatus[]{createStatus(file)};
-        } else if (filename.startsWith(pathnameAsDir)) {
+        if (filename.startsWith(pathnameAsDir)) {
           String tail = filename.substring(pathnameAsDir.length());
           int nextSlash = tail.indexOf('/');
           if (nextSlash > 0) {
@@ -899,11 +942,6 @@ public class TestInputOutputFormat {
           }
         }
       }
-      // for each directory add it once
-      for(String dir: dirs) {
-        result.add(createDirectory(new MockPath(this, pathnameAsDir + dir)));
-      }
-      return result.toArray(new FileStatus[result.size()]);
     }
 
     @Override
@@ -934,12 +972,18 @@ public class TestInputOutputFormat {
 
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
+      checkAccess();
       path = path.makeQualified(this);
       String pathnameAsDir = path.toString() + "/";
-      for(MockFile file: files) {
-        if (file.path.equals(path)) {
-          return createStatus(file);
-        } else if (file.path.toString().startsWith(pathnameAsDir)) {
+      MockFile file = findFile(path);
+      if (file != null) return createStatus(file);
+      for (MockFile dir : files) {
+        if (dir.path.toString().startsWith(pathnameAsDir)) {
+          return createDirectory(path);
+        }
+      }
+      for (MockFile dir : globalFiles) {
+        if (dir.path.toString().startsWith(pathnameAsDir)) {
           return createDirectory(path);
         }
       }
@@ -948,23 +992,23 @@ public class TestInputOutputFormat {
 
     @Override
     public BlockLocation[] getFileBlockLocations(FileStatus stat,
-                                                 long start, long len) {
+                                                 long start, long len) throws 
IOException {
+      checkAccess();
       List<BlockLocation> result = new ArrayList<BlockLocation>();
-      for(MockFile file: files) {
-        if (file.path.equals(stat.getPath())) {
-          for(MockBlock block: file.blocks) {
-            if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
-                block.length, start, len) > 0) {
-              String[] topology = new String[block.hosts.length];
-              for(int i=0; i < topology.length; ++i) {
-                topology[i] = "/rack/ " + block.hosts[i];
-              }
-              result.add(new BlockLocation(block.hosts, block.hosts,
-                  topology, block.offset, block.length));
+      MockFile file = findFile(stat.getPath());
+      if (file != null) {
+        for(MockBlock block: file.blocks) {
+          if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
+              block.length, start, len) > 0) {
+            String[] topology = new String[block.hosts.length];
+            for(int i=0; i < topology.length; ++i) {
+              topology[i] = "/rack/ " + block.hosts[i];
             }
+            result.add(new BlockLocation(block.hosts, block.hosts,
+                topology, block.offset, block.length));
           }
-          return result.toArray(new BlockLocation[result.size()]);
         }
+        return result.toArray(new BlockLocation[result.size()]);
       }
       return new BlockLocation[0];
     }
@@ -982,6 +1026,14 @@ public class TestInputOutputFormat {
       buffer.append("]}");
       return buffer.toString();
     }
+
+    public static void addGlobalFile(MockFile mockFile) {
+      globalFiles.add(mockFile);
+    }
+
+    public static void clearGlobalFiles() {
+      globalFiles.clear();
+    }
   }
 
   static void fill(DataOutputBuffer out, long length) throws IOException {
@@ -1062,7 +1114,7 @@ public class TestInputOutputFormat {
     OrcInputFormat.SplitGenerator splitter =
         new OrcInputFormat.SplitGenerator(new 
OrcInputFormat.SplitInfo(context, fs,
             AcidUtils.createOriginalObj(null, fs.getFileStatus(new 
Path("/a/file"))), null, true,
-            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
+            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), 
null);
     OrcSplit result = splitter.createSplit(0, 200, null);
     assertEquals(0, result.getStart());
     assertEquals(200, result.getLength());
@@ -1103,7 +1155,7 @@ public class TestInputOutputFormat {
     OrcInputFormat.SplitGenerator splitter =
         new OrcInputFormat.SplitGenerator(new 
OrcInputFormat.SplitInfo(context, fs,
             AcidUtils.createOriginalObj(null, fs.getFileStatus(new 
Path("/a/file"))), null, true,
-            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
+            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), 
null);
     List<OrcSplit> results = splitter.call();
     OrcSplit result = results.get(0);
     assertEquals(3, result.getStart());
@@ -1126,7 +1178,7 @@ public class TestInputOutputFormat {
     context = new OrcInputFormat.Context(conf);
     splitter = new OrcInputFormat.SplitGenerator(new 
OrcInputFormat.SplitInfo(context, fs,
       AcidUtils.createOriginalObj(null, fs.getFileStatus(new 
Path("/a/file"))), null, true,
-        new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
+        new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), 
null);
     results = splitter.call();
     for(int i=0; i < stripeSizes.length; ++i) {
       assertEquals("checking stripe " + i + " size",
@@ -1154,7 +1206,7 @@ public class TestInputOutputFormat {
     OrcInputFormat.SplitGenerator splitter =
         new OrcInputFormat.SplitGenerator(new 
OrcInputFormat.SplitInfo(context, fs,
             fs.getFileStatus(new Path("/a/file")), null, true,
-            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
+            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), 
null);
     List<OrcSplit> results = splitter.call();
     OrcSplit result = results.get(0);
     assertEquals(3, results.size());
@@ -1177,7 +1229,7 @@ public class TestInputOutputFormat {
     splitter = new OrcInputFormat.SplitGenerator(new 
OrcInputFormat.SplitInfo(context, fs,
         fs.getFileStatus(new Path("/a/file")), null, true,
         new ArrayList<AcidInputFormat.DeltaMetaData>(),
-        true, null, null));
+        true, null, null), null);
     results = splitter.call();
     assertEquals(5, results.size());
     for (int i = 0; i < stripeSizes.length; ++i) {
@@ -1197,7 +1249,7 @@ public class TestInputOutputFormat {
     splitter = new OrcInputFormat.SplitGenerator(new 
OrcInputFormat.SplitInfo(context, fs,
         fs.getFileStatus(new Path("/a/file")), null, true,
         new ArrayList<AcidInputFormat.DeltaMetaData>(),
-        true, null, null));
+        true, null, null), null);
     results = splitter.call();
     assertEquals(1, results.size());
     result = results.get(0);
@@ -2056,12 +2108,7 @@ public class TestInputOutputFormat {
   @Test
   public void testSplitEliminationNullStats() throws Exception {
     Properties properties = new Properties();
-    StructObjectInspector inspector;
-    synchronized (TestOrcFile.class) {
-      inspector = (StructObjectInspector)
-          ObjectInspectorFactory.getReflectionObjectInspector(SimpleRow.class,
-              ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-    }
+    StructObjectInspector inspector = createSoi();
     SerDe serde = new OrcSerde();
     OutputFormat<?, ?> outFormat = new OrcOutputFormat();
     conf.setInt("mapred.max.split.size", 50);
@@ -2094,4 +2141,60 @@ public class TestInputOutputFormat {
     assertEquals(0, splits.length);
   }
 
+  @Test
+  public void testDoAs() throws Exception {
+    conf.setInt(ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS.varname, 1);
+    conf.set(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL");
+    conf.setBoolean(ConfVars.HIVE_IN_TEST.varname, true);
+    conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
+    String badUser = UserGroupInformation.getCurrentUser().getShortUserName() 
+ "-foo";
+    MockFileSystem.setBlockedUgi(badUser);
+    MockFileSystem.clearGlobalFiles();
+    OrcInputFormat.Context.resetThreadPool(); // We need the size above to 
take effect.
+    try {
+      // OrcInputFormat will get a mock fs from FileSystem.get; add global 
files.
+      MockFileSystem.addGlobalFile(new MockFile("mock:/ugi/1/file", 10000,
+          createMockOrcFile(197, 300, 600), new MockBlock("host1-1", 
"host1-2", "host1-3")));
+      MockFileSystem.addGlobalFile(new MockFile("mock:/ugi/2/file", 10000,
+          createMockOrcFile(197, 300, 600), new MockBlock("host1-1", 
"host1-2", "host1-3")));
+      FileInputFormat.setInputPaths(conf, "mock:/ugi/1");
+      UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(badUser, new String[0]);
+      assertEquals(0, OrcInputFormat.Context.getCurrentThreadPoolSize());
+      try {
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            OrcInputFormat.generateSplitsInfo(conf, -1);
+            return null;
+          }
+        });
+        fail("Didn't throw");
+      } catch (Exception ex) {
+        Throwable cause = ex;
+        boolean found = false;
+        while (cause != null) {
+          if (cause instanceof MockFileSystem.MockAccessDenied) {
+            found = true; // Expected.
+            break;
+          }
+          cause = cause.getCause();
+        }
+        if (!found) throw ex; // Unexpected.
+      }
+      assertEquals(1, OrcInputFormat.Context.getCurrentThreadPoolSize());
+      FileInputFormat.setInputPaths(conf, "mock:/ugi/2");
+      List<OrcSplit> splits = OrcInputFormat.generateSplitsInfo(conf, -1);
+      assertEquals(1, splits.size());
+    } finally {
+      MockFileSystem.clearGlobalFiles();
+    }
+  }
+
+
+  private StructObjectInspector createSoi() {
+    synchronized (TestOrcFile.class) {
+      return 
(StructObjectInspector)ObjectInspectorFactory.getReflectionObjectInspector(
+          SimpleRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+  }
 }

Reply via email to