Author: vikram
Date: Tue Apr 8 18:25:42 2014
New Revision: 1585810
URL: http://svn.apache.org/r1585810
Log:
HIVE-6782 : HiveServer2Concurrency issue when running with tez intermittently,
throwing org.apache.tez.dag.api.SessionNotRunning: Application not running
error (Vikram Dixit K, reviewed by Thejas Nair)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
URL:
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java?rev=1585810&r1=1585809&r2=1585810&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java Tue
Apr 8 18:25:42 2014
@@ -441,6 +441,4 @@ public final class FileUtils {
}
return true;
}
-
-
}
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL:
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1585810&r1=1585809&r2=1585810&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue
Apr 8 18:25:42 2014
@@ -1020,7 +1020,8 @@ public class HiveConf extends Configurat
// Check if a plan contains a Cross Product.
// If there is one, output a warning to the Session's console.
HIVE_CHECK_CROSS_PRODUCT("hive.exec.check.crossproducts", true),
-
+
HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval",
5000L), // in ms
+
HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS("hive.localize.resource.num.wait.attempts",
5),
;
public final String varname;
Modified: hive/trunk/conf/hive-default.xml.template
URL:
http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1585810&r1=1585809&r2=1585810&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Tue Apr 8 18:25:42 2014
@@ -2604,4 +2604,20 @@
</description>
</property>
+<property>
+ <name>hive.localize.resource.wait.interval</name>
+ <value>5000</value>
+ <description>
+ Time in milliseconds to wait for another thread to localize the same
resource for hive-tez.
+ </description>
+</property>
+
+<property>
+ <name>hive.localize.resource.num.wait.attempts</name>
+ <value>5</value>
+ <description>
+ The number of attempts waiting for localizing a resource in hive-tez.
+ </description>
+</property>
+
</configuration>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1585810&r1=1585809&r2=1585810&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Tue Apr 8
18:25:42 2014
@@ -30,13 +30,16 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import java.io.DataInput;
@@ -51,6 +54,8 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
+import javax.security.auth.login.LoginException;
+
/**
* Context for Semantic Analyzers. Usage: not reusable - construct a new one
for
* each query should call clear() at end of use to remove temporary folders
@@ -204,12 +209,11 @@ public class Context {
try {
FileSystem fs = dirPath.getFileSystem(conf);
dirPath = new Path(fs.makeQualified(dirPath).toString());
- if (!fs.mkdirs(dirPath)) {
+ FsPermission fsPermission = new
FsPermission(Short.parseShort(scratchDirPermission.trim(), 8));
+
+ if (!Utilities.createDirsWithPermission(conf, dirPath,
fsPermission)) {
throw new RuntimeException("Cannot make directory: "
+ dirPath.toString());
- } else {
- FsPermission fsPermission = new
FsPermission(Short.parseShort(scratchDirPermission.trim(), 8));
- fs.setPermission(dirPath, fsPermission);
}
if (isHDFSCleanup) {
fs.deleteOnExit(dirPath);
@@ -222,6 +226,7 @@ public class Context {
fsScratchDirs.put(fileSystem + "-" + TaskRunner.getTaskRunnerID(), dir);
}
+
return dir;
}
@@ -256,6 +261,7 @@ public class Context {
try {
Path dir = FileUtils.makeQualified(nonLocalScratchPath, conf);
URI uri = dir.toUri();
+
Path newScratchDir = getScratchDir(uri.getScheme(), uri.getAuthority(),
!explain, uri.getPath());
LOG.info("New scratch dir is " + newScratchDir);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1585810&r1=1585809&r2=1585810&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue
Apr 8 18:25:42 2014
@@ -82,6 +82,8 @@ import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
+import javax.security.auth.login.LoginException;
+
import org.antlr.runtime.CommonToken;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
@@ -95,6 +97,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.HiveInterruptCallback;
import org.apache.hadoop.hive.common.HiveInterruptUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
@@ -175,6 +179,7 @@ import org.apache.hadoop.mapred.RecordRe
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
@@ -317,7 +322,7 @@ public final class Utilities {
LOG.debug("Loading plan from string: "+path.toUri().getPath());
String planString = conf.get(path.toUri().getPath());
if (planString == null) {
- LOG.debug("Could not find plan string in conf");
+ LOG.info("Could not find plan string in conf");
return null;
}
byte[] planBytes = Base64.decodeBase64(planString);
@@ -356,7 +361,7 @@ public final class Utilities {
return gWork;
} catch (FileNotFoundException fnf) {
// happens. e.g.: no reduce work.
- LOG.debug("No plan file found: "+path);
+ LOG.info("No plan file found: "+path);
return null;
} catch (Exception e) {
LOG.error("Failed to load plan: "+path, e);
@@ -3235,6 +3240,7 @@ public final class Utilities {
private static void createTmpDirs(Configuration conf,
List<Operator<? extends OperatorDesc>> ops) throws IOException {
+ FsPermission fsPermission = new FsPermission((short)00777);
while (!ops.isEmpty()) {
Operator<? extends OperatorDesc> op = ops.remove(0);
@@ -3244,8 +3250,7 @@ public final class Utilities {
if (tempDir != null) {
Path tempPath = Utilities.toTempPath(tempDir);
- FileSystem fs = tempPath.getFileSystem(conf);
- fs.mkdirs(tempPath);
+ createDirsWithPermission(conf, tempPath, fsPermission);
}
}
@@ -3367,4 +3372,49 @@ public final class Utilities {
}
return footerCount;
}
+
+ /**
+ * @param conf the configuration used to derive the filesystem to create the
path
+ * @param mkdir the path to be created
+ * @param fsPermission ignored if it is hive server session and doAs is
enabled
+ * @return true if successfully created the directory else false
+ * @throws IOException if hdfs experiences any error conditions
+ */
+ public static boolean createDirsWithPermission(Configuration conf, Path
mkdir,
+ FsPermission fsPermission) throws IOException {
+ // this umask is required because by default the hdfs mask is 022
resulting in
+ // all parents getting the fsPermission & !(022) permission instead of
fsPermission
+ boolean recursive = false;
+ if (SessionState.get() != null) {
+ recursive = SessionState.get().isHiveServerQuery() &&
+ conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname,
+ HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal);
+ // we reset the permission in case of hive server and doAs enabled
because
+ // currently scratch directory uses /tmp/hive-hive as the scratch
directory.
+ // However, with doAs enabled, the first user to create this directory
would
+ // own the directory and subsequent users cannot access the scratch
directory.
+ // The right fix is to have scratch dir per user.
+ fsPermission = new FsPermission((short)00777);
+ }
+
+ // if we made it so far without exception we are good!
+ return createDirsWithPermission(conf, mkdir, fsPermission, recursive);
+ }
+
+ public static boolean createDirsWithPermission(Configuration conf, Path
mkdir,
+ FsPermission fsPermission, boolean recursive) throws IOException {
+ String origUmask = null;
+ if (recursive) {
+ origUmask = conf.get("fs.permissions.umask-mode");
+ conf.set("fs.permissions.umask-mode", "000");
+ }
+ FileSystem fs = mkdir.getFileSystem(conf);
+ boolean retval = fs.mkdirs(mkdir, fsPermission);
+ if (origUmask != null) {
+ conf.set("fs.permissions.umask-mode", origUmask);
+ } else {
+ conf.unset("fs.permissions.umask-mode");
+ }
+ return retval;
+ }
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1585810&r1=1585809&r2=1585810&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Tue
Apr 8 18:25:42 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -111,7 +112,6 @@ import org.apache.tez.mapreduce.input.MR
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@@ -571,6 +571,7 @@ public class DagUtils {
URL resourceURL = ConverterUtils.getYarnUrlFromPath(file);
long resourceSize = fstat.getLen();
long resourceModificationTime = fstat.getModificationTime();
+ LOG.info("Resource modification time: " + resourceModificationTime);
LocalResource lr = Records.newRecord(LocalResource.class);
lr.setResource(resourceURL);
@@ -713,9 +714,9 @@ public class DagUtils {
if (!StringUtils.isNotBlank(file)) {
continue;
}
- String hdfsFilePathStr = hdfsDirPathStr + "/" +
getResourceBaseName(file);
+ Path hdfsFilePath = new Path(hdfsDirPathStr, getResourceBaseName(new
Path(file)));
LocalResource localResource = localizeResource(new Path(file),
- new Path(hdfsFilePathStr), conf);
+ hdfsFilePath, conf);
tmpResources.add(localResource);
}
}
@@ -760,10 +761,8 @@ public class DagUtils {
* @param pathStr - the string from which we try to determine the resource
base name
* @return the name of the resource from a given path string.
*/
- public String getResourceBaseName(String pathStr) {
- // TODO: this should probably use Path::getName
- String[] splits = pathStr.split("/");
- return splits[splits.length - 1];
+ public String getResourceBaseName(Path path) {
+ return path.getName();
}
/**
@@ -776,22 +775,10 @@ public class DagUtils {
private boolean checkPreExisting(Path src, Path dest, Configuration conf)
throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
-
- if (!destFS.exists(dest)) {
- return false;
- }
- FileStatus destStatus = destFS.getFileStatus(dest);
- if (destStatus.isDir()) {
- return false;
- }
-
- String srcName = getResourceBaseName(src.toString());
- String destName = getResourceBaseName(dest.toString());
-
- if (srcName.equals(destName)) {
- return true;
+ FileSystem sourceFS = src.getFileSystem(conf);
+ if (destFS.exists(dest)) {
+ return (sourceFS.getFileStatus(src).getLen() ==
destFS.getFileStatus(dest).getLen());
}
-
return false;
}
@@ -810,10 +797,39 @@ public class DagUtils {
}
if (src != null) {
- if (!checkPreExisting(src, dest, conf)) {
- // copy the src to the destination and create local resource.
- // overwrite even if file already exists.
- destFS.copyFromLocalFile(false, true, src, dest);
+ // copy the src to the destination and create local resource.
+ // do not overwrite.
+ LOG.info("Localizing resource because it does not exist: " + src + " to
dest: " + dest);
+ try {
+ destFS.copyFromLocalFile(false, false, src, dest);
+ } catch (IOException e) {
+ LOG.info("Looks like another thread is writing the same file will
wait.");
+ int waitAttempts =
+
conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname,
+
HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal);
+ long sleepInterval =
+
conf.getLong(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.varname,
+
HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.defaultLongVal);
+ LOG.info("Number of wait attempts: " + waitAttempts + ". Wait
interval: "
+ + sleepInterval);
+ boolean found = false;
+ for (int i = 0; i < waitAttempts; i++) {
+ if (!checkPreExisting(src, dest, conf)) {
+ try {
+ Thread.currentThread().sleep(sleepInterval);
+ } catch (InterruptedException interruptedException) {
+ throw new IOException(interruptedException);
+ }
+ } else {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ LOG.error("Could not find the jar that was being uploaded");
+ throw new IOException("Previous writer likely failed to write " +
dest +
+ ". Failing because I am unlikely to write too.");
+ }
}
}
@@ -948,11 +964,27 @@ public class DagUtils {
* be used with Tez. Assumes scratchDir exists.
*/
public Path createTezDir(Path scratchDir, Configuration conf)
- throws IOException {
+ throws IOException {
+ UserGroupInformation ugi;
+ String userName = System.getProperty("user.name");
+ try {
+ ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
+ userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
+ } catch (LoginException e) {
+ throw new IOException(e);
+ }
+
+ scratchDir = new Path(scratchDir, userName);
+
Path tezDir = getTezDir(scratchDir);
FileSystem fs = tezDir.getFileSystem(conf);
+ LOG.debug("TezDir path set " + tezDir + " for user: " + userName);
+ // since we are adding the user name to the scratch dir, we do not
+ // need to give more permissions here
fs.mkdirs(tezDir);
+
return tezDir;
+
}
/**
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1585810&r1=1585809&r2=1585810&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
Tue Apr 8 18:25:42 2014
@@ -21,11 +21,15 @@ package org.apache.hadoop.hive.ql.exec.t
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import javax.security.auth.login.LoginException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* This class is for managing multiple tez sessions particularly when
@@ -213,6 +217,18 @@ public class TezSessionPoolManager {
return false;
}
+ try {
+ UserGroupInformation ugi =
ShimLoader.getHadoopShims().getUGIForConf(conf);
+ String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
+ LOG.info("The current user: " + userName + ", session user: " +
session.getUser());
+ if (userName.equals(session.getUser()) == false) {
+ LOG.info("Different users incoming: " + userName + " existing: " +
session.getUser());
+ return false;
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+
HiveConf existingConf = session.getConf();
if (existingConf == null) {
return false;
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1585810&r1=1585809&r2=1585810&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
Tue Apr 8 18:25:42 2014
@@ -38,10 +38,15 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.PreWarmContext;
@@ -68,6 +73,7 @@ public class TezSessionState {
private DagUtils utils;
private String queueName;
private boolean defaultQueue = false;
+ private String user;
private HashSet<String> additionalAmFiles = null;
@@ -127,6 +133,11 @@ public class TezSessionState {
throws IOException, LoginException, IllegalArgumentException,
URISyntaxException, TezException {
this.conf = conf;
+ UserGroupInformation ugi;
+ ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
+ user = ShimLoader.getHadoopShims().getShortUserName(ugi);
+ LOG.info("User of session id " + sessionId + " is " + user);
+
// create the tez tmp dir
tezScratchDir = createTezDir(sessionId);
@@ -219,8 +230,7 @@ public class TezSessionState {
}
if (!keepTmpDir) {
- FileSystem fs = tezScratchDir.getFileSystem(conf);
- fs.delete(tezScratchDir, true);
+ cleanupScratchDir();
}
session = null;
tezScratchDir = null;
@@ -229,6 +239,12 @@ public class TezSessionState {
additionalAmFiles = null;
}
+ public void cleanupScratchDir () throws IOException {
+ FileSystem fs = tezScratchDir.getFileSystem(conf);
+ fs.delete(tezScratchDir, true);
+ tezScratchDir = null;
+ }
+
public String getSessionId() {
return sessionId;
}
@@ -257,7 +273,8 @@ public class TezSessionState {
TEZ_DIR);
tezDir = new Path(tezDir, sessionId);
FileSystem fs = tezDir.getFileSystem(conf);
- fs.mkdirs(tezDir);
+ FsPermission fsPermission = new FsPermission((short)00777);
+ Utilities.createDirsWithPermission(conf, tezDir, fsPermission, true);
// don't keep the directory around on non-clean exit
fs.deleteOnExit(tezDir);
@@ -313,6 +330,7 @@ public class TezSessionState {
Path localFile = new Path(localJarPath);
String sha = getSha(localFile);
+
String destFileName = localFile.getName();
// Now, try to find the file based on SHA and name. Currently we require
exact name match.
@@ -326,19 +344,6 @@ public class TezSessionState {
// TODO: if this method is ever called on more than one jar, getting the
dir and the
// list need to be refactored out to be done only once.
- Path jarPath = null;
- FileStatus[] listFileStatus = destFs.listStatus(destDirPath);
- for (FileStatus fstatus : listFileStatus) {
- String jarName =
utils.getResourceBaseName(fstatus.getPath().toString()); // ...
- if (jarName.equals(destFileName)) {
- // We have found the jar we need.
- jarPath = fstatus.getPath();
- return utils.localizeResource(null, jarPath, conf);
- }
- }
-
- // Jar wasn't in the directory, copy the one in current use.
- assert jarPath == null;
Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
return utils.localizeResource(localFile, destFile, conf);
}
@@ -386,4 +391,8 @@ public class TezSessionState {
public HiveConf getConf() {
return conf;
}
+
+ public String getUser() {
+ return user;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1585810&r1=1585809&r2=1585810&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Tue
Apr 8 18:25:42 2014
@@ -126,7 +126,9 @@ public class TezTask extends Task<TezWor
// we will localize all the files (jars, plans, hashtables) to the
// scratch dir. let's create this and tmp first.
Path scratchDir = ctx.getMRScratchDir();
- utils.createTezDir(scratchDir, conf);
+
+ // create the tez tmp dir
+ scratchDir = utils.createTezDir(scratchDir, conf);
// we need to get the user specified local resources for this dag
String hiveJarDir = utils.getHiveJarDirectory(conf);
@@ -216,8 +218,7 @@ public class TezTask extends Task<TezWor
List<BaseWork> ws = work.getAllWork();
Collections.reverse(ws);
- Path tezDir = utils.getTezDir(scratchDir);
- FileSystem fs = tezDir.getFileSystem(conf);
+ FileSystem fs = scratchDir.getFileSystem(conf);
// the name of the dag is what is displayed in the AM/Job UI
DAG dag = new DAG(work.getName());
@@ -272,7 +273,7 @@ public class TezTask extends Task<TezWor
} else {
// Regular vertices
JobConf wxConf = utils.initializeVertexConf(conf, w);
- Vertex wx = utils.createVertex(wxConf, w, tezDir, appJarLr,
+ Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr,
additionalLr, fs, ctx, !isFinal, work);
dag.addVertex(wx);
utils.addCredentials(w, dag);
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java?rev=1585810&r1=1585809&r2=1585810&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
Tue Apr 8 18:25:42 2014
@@ -24,6 +24,8 @@ import java.net.URISyntaxException;
import javax.security.auth.login.LoginException;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.dag.api.TezException;
@@ -37,6 +39,7 @@ public class TestTezSessionState extends
private boolean open;
private String sessionId;
private HiveConf hiveConf;
+ private String user;
public TestTezSessionState(String sessionId) {
super(sessionId);
@@ -53,9 +56,13 @@ public class TestTezSessionState extends
}
@Override
- public void open(HiveConf conf) {
- this.hiveConf = conf;
- }
+ public void open(HiveConf conf) throws IOException,
+ LoginException, URISyntaxException, TezException {
+ this.hiveConf = conf;
+ UserGroupInformation ugi;
+ ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
+ user = ShimLoader.getHadoopShims().getShortUserName(ugi);
+ }
@Override
public void close(boolean keepTmpDir) throws TezException, IOException {
@@ -70,4 +77,8 @@ public class TestTezSessionState extends
public String getSessionId() {
return sessionId;
}
+
+ public String getUser() {
+ return user;
+ }
}