HIVE-18153 : refactor reopen and file management in TezTask (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/89dbf4e9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/89dbf4e9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/89dbf4e9 Branch: refs/heads/standalone-metastore Commit: 89dbf4e904592318da954eaf94548ec1b130e17c Parents: ca96613 Author: sergey <[email protected]> Authored: Thu Dec 14 15:53:44 2017 -0800 Committer: sergey <[email protected]> Committed: Thu Dec 14 15:53:44 2017 -0800 ---------------------------------------------------------------------- ql/pom.xml | 2 - .../hadoop/hive/ql/exec/mr/ExecDriver.java | 1 + .../hadoop/hive/ql/exec/tez/DagUtils.java | 102 ++++----- .../hadoop/hive/ql/exec/tez/TezSessionPool.java | 23 +- .../hive/ql/exec/tez/TezSessionPoolManager.java | 36 +--- .../hive/ql/exec/tez/TezSessionPoolSession.java | 20 +- .../hive/ql/exec/tez/TezSessionState.java | 208 +++++++++++-------- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 189 ++++++----------- .../hive/ql/exec/tez/WorkloadManager.java | 118 +++++++---- .../ql/exec/tez/monitoring/TezJobMonitor.java | 2 +- .../ql/udf/generic/GenericUDTFGetSplits.java | 5 +- .../hive/ql/exec/tez/SampleTezSessionState.java | 7 +- .../hive/ql/exec/tez/TestTezSessionPool.java | 30 ++- .../hadoop/hive/ql/exec/tez/TestTezTask.java | 73 ++----- .../hive/ql/exec/tez/TestWorkloadManager.java | 10 +- 15 files changed, 378 insertions(+), 448 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/pom.xml ---------------------------------------------------------------------- diff --git a/ql/pom.xml b/ql/pom.xml index f35a4c8..cbf71cd 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -225,8 +225,6 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-registry</artifactId> <version>${hadoop.version}</version> - <optional>true</optional> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 88a75ed..3f470eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -400,6 +400,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop SessionState ss = SessionState.get(); // TODO: why is there a TezSession in MR ExecDriver? if (ss != null && HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + // TODO: this is the only place that uses keepTmpDir. Why? TezSessionPoolManager.closeIfNotDefault(ss.getTezSession(), true); } http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 6c1afa6..e4a6f62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -17,14 +17,13 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; - import javax.security.auth.login.LoginException; - import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; @@ -40,7 +39,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang.StringUtils; import org.apache.tez.mapreduce.common.MRInputSplitDistributor; @@ -541,16 +539,15 @@ public class DagUtils { } } - private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr, - List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx, - VertexType vertexType) - throws Exception { + private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, FileSystem fs, + Path mrScratchDir, Context ctx, VertexType vertexType, + Map<String, LocalResource> localResources) throws Exception { Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false); if (mergeJoinWork.getMainWork() instanceof MapWork) { List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList(); MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork()); - Vertex mergeVx = - createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType); + Vertex mergeVx = createVertex( + conf, mapWork, fs, mrScratchDir, ctx, vertexType, localResources); conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); // mapreduce.tez.input.initializer.serialize.event.payload should be set @@ -580,10 +577,8 @@ public class DagUtils { mergeVx.setVertexManagerPlugin(desc); return mergeVx; } else { - Vertex mergeVx = - createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs, - mrScratchDir, ctx); - return mergeVx; + return createVertex(conf, + (ReduceWork) mergeJoinWork.getMainWork(), fs, mrScratchDir, ctx, localResources); } } @@ -591,11 +586,8 @@ public class DagUtils { * Helper function to create Vertex from MapWork. */ private Vertex createVertex(JobConf conf, MapWork mapWork, - LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs, - Path mrScratchDir, Context ctx, VertexType vertexType) - throws Exception { - - Path tezDir = getTezDir(mrScratchDir); + FileSystem fs, Path mrScratchDir, Context ctx, VertexType vertexType, + Map<String, LocalResource> localResources) throws Exception { // set up the operator plan Utilities.cacheMapWork(conf, mapWork, mrScratchDir); @@ -726,13 +718,6 @@ public class DagUtils { // Add the actual source input String alias = mapWork.getAliasToWork().keySet().iterator().next(); map.addDataSource(alias, dataSource); - - Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); - localResources.put(getBaseName(appJarLr), appJarLr); - for (LocalResource lr: additionalLr) { - localResources.put(getBaseName(lr), lr); - } - map.addTaskLocalFiles(localResources); return map; } @@ -772,9 +757,9 @@ public class DagUtils { /* * Helper function to create Vertex for given ReduceWork. */ - private Vertex createVertex(JobConf conf, ReduceWork reduceWork, - LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs, - Path mrScratchDir, Context ctx) throws Exception { + private Vertex createVertex(JobConf conf, ReduceWork reduceWork, FileSystem fs, + Path mrScratchDir, Context ctx, Map<String, LocalResource> localResources) + throws Exception { // set up operator plan conf.set(Utilities.INPUT_NAME, reduceWork.getName()); @@ -796,17 +781,28 @@ public class DagUtils { reducer.setTaskEnvironment(getContainerEnvironment(conf, false)); reducer.setExecutionContext(vertexExecutionContext); reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); - - Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); - localResources.put(getBaseName(appJarLr), appJarLr); - for (LocalResource lr: additionalLr) { - localResources.put(getBaseName(lr), lr); - } reducer.addTaskLocalFiles(localResources); - return reducer; } + public static Map<String, LocalResource> createTezLrMap( + LocalResource appJarLr, Collection<LocalResource> additionalLr) { + // Note: interestingly this would exclude LLAP app jars that the session adds for LLAP case. + // Of course it doesn't matter because vertices run ON LLAP and have those jars, and + // moreover we anyway don't localize jars for the vertices on LLAP; but in theory + // this is still crappy code that assumes there's one and only app jar. + Map<String, LocalResource> localResources = new HashMap<>(); + if (appJarLr != null) { + localResources.put(getBaseName(appJarLr), appJarLr); + } + if (additionalLr != null) { + for (LocalResource lr: additionalLr) { + localResources.put(getBaseName(lr), lr); + } + } + return localResources; + } + /* * Helper method to create a yarn local resource. */ @@ -1064,7 +1060,7 @@ public class DagUtils { /* * Helper function to retrieve the basename of a local resource */ - public String getBaseName(LocalResource lr) { + public static String getBaseName(LocalResource lr) { return FilenameUtils.getName(lr.getResource().getFile()); } @@ -1254,30 +1250,26 @@ public class DagUtils { * @param work The instance of BaseWork representing the actual work to be performed * by this vertex. * @param scratchDir HDFS scratch dir for this execution unit. - * @param appJarLr Local resource for hive-exec. - * @param additionalLr * @param fileSystem FS corresponding to scratchDir and LocalResources * @param ctx This query's context * @return Vertex */ @SuppressWarnings("deprecation") public Vertex createVertex(JobConf conf, BaseWork work, - Path scratchDir, LocalResource appJarLr, - List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren, - TezWork tezWork, VertexType vertexType) throws Exception { + Path scratchDir, FileSystem fileSystem, Context ctx, boolean hasChildren, + TezWork tezWork, VertexType vertexType, Map<String, LocalResource> localResources) throws Exception { Vertex v = null; // simply dispatch the call to the right method for the actual (sub-) type of // BaseWork. if (work instanceof MapWork) { - v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx, - vertexType); + v = createVertex( + conf, (MapWork) work, fileSystem, scratchDir, ctx, vertexType, localResources); } else if (work instanceof ReduceWork) { - v = createVertex(conf, (ReduceWork) work, appJarLr, - additionalLr, fileSystem, scratchDir, ctx); + v = createVertex(conf, (ReduceWork) work, fileSystem, scratchDir, ctx, localResources); } else if (work instanceof MergeJoinWork) { - v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir, - ctx, vertexType); + v = createVertex( + conf, (MergeJoinWork) work, fileSystem, scratchDir, ctx, vertexType, localResources); // set VertexManagerPlugin if whether it's a cross product destination vertex List<String> crossProductSources = new ArrayList<>(); for (BaseWork parentWork : tezWork.getParents(work)) { @@ -1522,4 +1514,18 @@ public class DagUtils { // -Xmx not specified return -1; } + + // The utility of this method is not certain. + public static Map<String, LocalResource> getResourcesUpdatableForAm( + Collection<LocalResource> allNonAppResources) { + HashMap<String, LocalResource> allNonAppFileResources = new HashMap<>(); + if (allNonAppResources == null) return allNonAppFileResources; + for (LocalResource lr : allNonAppResources) { + if (lr.getType() == LocalResourceType.FILE) { + // TEZ AM will only localize FILE (no script operators in the AM) + allNonAppFileResources.put(DagUtils.getBaseName(lr), lr); + } + } + return allNonAppFileResources; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 3bcf657..6e2dfe1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -217,7 +217,10 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> { } } // If there are async requests, satisfy them first. - if (!asyncRequests.isEmpty() && session.tryUse(false)) { + if (!asyncRequests.isEmpty()) { + if (!session.tryUse(false)) { + return true; // Session has expired and will be returned to us later. + } future = asyncRequests.poll(); } if (future == null) { @@ -238,24 +241,12 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> { return true; } - void replaceSession(SessionType oldSession, boolean keepTmpDir, - String[] additionalFilesArray) throws Exception { - // Retain the stuff from the old session. + void replaceSession(SessionType oldSession) throws Exception { // Re-setting the queue config is an old hack that we may remove in future. SessionType newSession = sessionObjFactory.create(oldSession); - Path scratchDir = oldSession.getTezScratchDir(); String queueName = oldSession.getQueueName(); - Set<String> additionalFiles = null; - if (additionalFilesArray != null) { - additionalFiles = new HashSet<>(); - for (String file : additionalFilesArray) { - additionalFiles.add(file); - } - } else { - additionalFiles = oldSession.getAdditionalFilesNotFromConf(); - } try { - oldSession.close(keepTmpDir); + oldSession.close(false); } finally { poolLock.lock(); try { @@ -280,7 +271,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> { // probably just get rid of the thread local usage in TezSessionState. SessionState.setCurrentSessionState(parentSessionState); } - newSession.open(additionalFiles, scratchDir); + newSession.open(); if (!putSessionBack(newSession, false)) { if (LOG.isDebugEnabled()) { LOG.debug("Closing an unneeded session " + newSession http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 8417ebb..3c1b8d0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -18,16 +18,14 @@ package org.apache.hadoop.hive.ql.exec.tez; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources; + import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; @@ -43,7 +41,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.annotations.VisibleForTesting; /** @@ -444,44 +441,35 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger /** Reopens the session that was found to not be running. */ @Override - public TezSessionState reopen(TezSessionState sessionState, - Configuration conf, String[] additionalFiles) throws Exception { + public TezSessionState reopen(TezSessionState sessionState) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionState.getQueueName() != null && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) { sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName()); } - reopenInternal(sessionState, additionalFiles); + reopenInternal(sessionState); return sessionState; } static void reopenInternal( - TezSessionState sessionState, String[] additionalFiles) throws Exception { - Set<String> oldAdditionalFiles = sessionState.getAdditionalFilesNotFromConf(); - // TODO: implies the session files and the array are the same if not null; why? very brittle - if ((oldAdditionalFiles == null || oldAdditionalFiles.isEmpty()) - && (additionalFiles != null)) { - oldAdditionalFiles = new HashSet<>(); - for (String file : additionalFiles) { - oldAdditionalFiles.add(file); - } - } + TezSessionState sessionState) throws Exception { + HiveResources resources = sessionState.extractHiveResources(); // TODO: close basically resets the object to a bunch of nulls. // We should ideally not reuse the object because it's pointless and error-prone. - sessionState.close(true); + sessionState.close(false); // Note: scratchdir is reused implicitly because the sessionId is the same. - sessionState.open(oldAdditionalFiles, null); + sessionState.open(resources); } - public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception { + public void closeNonDefaultSessions() throws Exception { List<TezSessionState> sessionsToClose = null; synchronized (openSessions) { sessionsToClose = new ArrayList<TezSessionState>(openSessions); } for (TezSessionState sessionState : sessionsToClose) { System.err.println("Shutting down tez session."); - closeIfNotDefault(sessionState, keepTmpDir); + closeIfNotDefault(sessionState, false); } } @@ -492,9 +480,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger if (queueName == null) { LOG.warn("Pool session has a null queue: " + oldSession); } - TezSessionPoolSession newSession = createAndInitSession( - queueName, oldSession.isDefault(), oldSession.getConf()); - defaultSessionPool.replaceSession(oldSession, false, null); + defaultSessionPool.replaceSession(oldSession); } /** Called by TezSessionPoolSession when opened. */ http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index b3ccd24..96ade50 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -19,11 +19,8 @@ package org.apache.hadoop.hive.ql.exec.tez; import com.google.common.util.concurrent.SettableFuture; - import org.apache.hadoop.hive.registry.impl.TezAmInstance; - import org.apache.hadoop.conf.Configuration; - import java.io.IOException; import java.net.URISyntaxException; import java.util.Collection; @@ -31,9 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import javax.security.auth.login.LoginException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -42,7 +37,6 @@ import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.tez.dag.api.TezException; - import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -66,8 +60,7 @@ class TezSessionPoolSession extends TezSessionState { void returnAfterUse(TezSessionPoolSession session) throws Exception; - TezSessionState reopen(TezSessionState session, Configuration conf, - String[] inputOutputJars) throws Exception; + TezSessionState reopen(TezSessionState session) throws Exception; void destroy(TezSessionState session) throws Exception; } @@ -128,10 +121,10 @@ class TezSessionPoolSession extends TezSessionState { } @Override - protected void openInternal(Collection<String> additionalFiles, - boolean isAsync, LogHelper console, Path scratchDir) + protected void openInternal(String[] additionalFiles, + boolean isAsync, LogHelper console, HiveResources resources) throws IOException, LoginException, URISyntaxException, TezException { - super.openInternal(additionalFiles, isAsync, console, scratchDir); + super.openInternal(additionalFiles, isAsync, console, resources); parent.registerOpenSession(this); if (expirationTracker != null) { expirationTracker.addToExpirationQueue(this); @@ -206,9 +199,8 @@ class TezSessionPoolSession extends TezSessionState { } @Override - public TezSessionState reopen( - Configuration conf, String[] inputOutputJars) throws Exception { - return parent.reopen(this, conf, inputOutputJars); + public TezSessionState reopen() throws Exception { + return parent.reopen(this); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index dd879fc..5e892c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -17,7 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.util.Collection; +import org.apache.hadoop.registry.client.api.RegistryOperations; + import java.io.File; import java.io.IOException; import java.net.URISyntaxException; @@ -37,7 +38,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.login.LoginException; - import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; @@ -90,7 +90,6 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; - import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -126,8 +125,18 @@ public class TezSessionState { private AtomicReference<String> ownerThread = new AtomicReference<>(null); - private final Set<String> additionalFilesNotFromConf = new HashSet<String>(); - private final Set<LocalResource> localizedResources = new HashSet<LocalResource>(); + public static final class HiveResources { + public HiveResources(Path dagResourcesDir) { + this.dagResourcesDir = dagResourcesDir; + } + /** A directory that will contain resources related to DAGs and specified in configs. */ + public final Path dagResourcesDir; + public final Set<String> additionalFilesNotFromConf = new HashSet<>(); + /** Localized resources of this session; both from conf and not from conf (above). */ + public final Set<LocalResource> localizedResources = new HashSet<>(); + } + + private HiveResources resources; @JsonProperty("doAsEnabled") private boolean doAsEnabled; private boolean isLegacyLlapMode; @@ -201,45 +210,32 @@ public class TezSessionState { } public void open() throws IOException, LoginException, URISyntaxException, TezException { - Set<String> noFiles = null; - open(noFiles, null); + String[] noFiles = null; + open(noFiles); } /** * Creates a tez session. A session is tied to either a cli/hs2 session. You can * submit multiple DAGs against a session (as long as they are executed serially). - * @throws IOException - * @throws URISyntaxException - * @throws LoginException - * @throws TezException - * @throws InterruptedException */ - public void open(String[] additionalFiles) + public void open(String[] additionalFilesNotFromConf) throws IOException, LoginException, URISyntaxException, TezException { - openInternal(setFromArray(additionalFiles), false, null, null); + openInternal(additionalFilesNotFromConf, false, null, null); } - private static Set<String> setFromArray(String[] additionalFiles) { - if (additionalFiles == null) return null; - Set<String> files = new HashSet<>(); - for (String originalFile : additionalFiles) { - files.add(originalFile); - } - return files; + + public void open(HiveResources resources) + throws LoginException, IOException, URISyntaxException, TezException { + openInternal(null, false, null, resources); } public void beginOpen(String[] additionalFiles, LogHelper console) throws IOException, LoginException, URISyntaxException, TezException { - openInternal(setFromArray(additionalFiles), true, console, null); + openInternal(additionalFiles, true, console, null); } - public void open(Collection<String> additionalFiles, Path scratchDir) - throws LoginException, IOException, URISyntaxException, TezException { - openInternal(additionalFiles, false, null, scratchDir); - } - - protected void openInternal(Collection<String> additionalFiles, - boolean isAsync, LogHelper console, Path scratchDir) + protected void openInternal(String[] additionalFilesNotFromConf, + boolean isAsync, LogHelper console, HiveResources resources) throws IOException, LoginException, URISyntaxException, TezException { // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two. String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); @@ -258,25 +254,25 @@ public class TezSessionState { user = ugi.getShortUserName(); LOG.info("User of session id " + sessionId + " is " + user); - // create the tez tmp dir - tezScratchDir = scratchDir == null ? createTezDir(sessionId) : scratchDir; - - additionalFilesNotFromConf.clear(); - if (additionalFiles != null) { - additionalFilesNotFromConf.addAll(additionalFiles); + // Create the tez tmp dir and a directory for Hive resources. + tezScratchDir = createTezDir(sessionId, null); + if (resources != null) { + // If we are getting the resources externally, don't relocalize anything. + this.resources = resources; + } else { + this.resources = new HiveResources(createTezDir(sessionId, "resources")); + ensureLocalResources(conf, additionalFilesNotFromConf); } - refreshLocalResourcesFromConf(conf); - // unless already installed on all the cluster nodes, we'll have to // localize hive-exec.jar as well. appJarLr = createJarLocalResource(utils.getExecJarPathLocal()); // configuration for the application master final Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>(); - commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr); - for (LocalResource lr : localizedResources) { - commonLocalResources.put(utils.getBaseName(lr), lr); + commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr); + for (LocalResource lr : this.resources.localizedResources) { + commonLocalResources.put(DagUtils.getBaseName(lr), lr); } if (llapMode) { @@ -284,7 +280,7 @@ public class TezSessionState { addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources); addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources); addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources); - addJarLRByClassName("org.apache.hadoop.registry.client.api.RegistryOperations", commonLocalResources); + addJarLRByClass(RegistryOperations.class, commonLocalResources); } // Create environment for AM. @@ -556,36 +552,54 @@ public class TezSessionState { tezConf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, modifyStr); } - public void refreshLocalResourcesFromConf(HiveConf conf) - throws IOException, LoginException, URISyntaxException, TezException { - - String dir = tezScratchDir.toString(); - - localizedResources.clear(); + /** This is called in openInternal and in TezTask.updateSession to localize conf resources. */ + public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) + throws IOException, LoginException, URISyntaxException, TezException { + String dir = resources.dagResourcesDir.toString(); + resources.localizedResources.clear(); - // these are local resources set through add file, jar, etc + // Always localize files from conf; duplicates are handled on FS level. + // TODO: we could do the same thing as below and only localize if missing. + // That could be especially valuable given that this almost always the same set. List<LocalResource> lrs = utils.localizeTempFilesFromConf(dir, conf); if (lrs != null) { - localizedResources.addAll(lrs); + resources.localizedResources.addAll(lrs); } - // these are local resources that are set through the mr "tmpjars" property; skip session files. - List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf, - additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]), - DagUtils.getTempFilesFromConf(conf)); - - if (handlerLr != null) { - localizedResources.addAll(handlerLr); + // Localize the non-conf resources that are missing from the current list. + List<LocalResource> newResources = null; + if (newFilesNotFromConf != null && newFilesNotFromConf.length > 0) { + boolean hasResources = !resources.additionalFilesNotFromConf.isEmpty(); + if (hasResources) { + for (String s : newFilesNotFromConf) { + hasResources = resources.additionalFilesNotFromConf.contains(s); + if (!hasResources) break; + } + } + if (!hasResources) { + String[] skipFilesFromConf = DagUtils.getTempFilesFromConf(conf); + newResources = utils.localizeTempFiles(dir, conf, newFilesNotFromConf, skipFilesFromConf); + if (newResources != null) { + resources.localizedResources.addAll(newResources); + } + for (String fullName : newFilesNotFromConf) { + resources.additionalFilesNotFromConf.add(fullName); + } + } } - } - public boolean hasResources(String[] localAmResources) { - if (localAmResources == null || localAmResources.length == 0) return true; - if (additionalFilesNotFromConf.isEmpty()) return false; - for (String s : localAmResources) { - if (!additionalFilesNotFromConf.contains(s)) return false; + // Finally add the files to AM. The old code seems to do this twice, first for all the new + // resources regardless of type; and then for all the session resources that are not of type + // file (see branch-1 calls to addAppMasterLocalFiles: from updateSession and with resourceMap + // from submit). + // TODO: Do we really need all this nonsense? + if (newResources != null && !newResources.isEmpty()) { + session.addAppMasterLocalFiles(DagUtils.createTezLrMap(null, newResources)); + } + if (!resources.localizedResources.isEmpty()) { + session.addAppMasterLocalFiles( + DagUtils.getResourcesUpdatableForAm(resources.localizedResources)); } - return true; } /** @@ -593,11 +607,11 @@ public class TezSessionState { * further DAGs can be executed against it. Only called by session management classes; some * sessions should not simply be closed by users - e.g. pool sessions need to be restarted. * - * @param keepTmpDir + * @param keepDagFilesDir * whether or not to remove the scratch dir at the same time. * @throws Exception */ - void close(boolean keepTmpDir) throws Exception { + void close(boolean keepDagFilesDir) throws Exception { if (session != null) { LOG.info("Closing Tez Session"); closeClient(session); @@ -618,20 +632,16 @@ public class TezSessionState { } } - if (!keepTmpDir) { - cleanupScratchDir(); + cleanupScratchDir(); + if (!keepDagFilesDir) { + cleanupDagResources(); } session = null; sessionFuture = null; console = null; tezScratchDir = null; + // Do not reset dag resources; if it wasn't cleaned it's still needed. appJarLr = null; - additionalFilesNotFromConf.clear(); - localizedResources.clear(); - } - - public Set<String> getAdditionalFilesNotFromConf() { - return additionalFilesNotFromConf; } private void closeClient(TezClient client) throws TezException, @@ -643,12 +653,18 @@ public class TezSessionState { } } - protected final void cleanupScratchDir () throws IOException { + protected final void cleanupScratchDir() throws IOException { FileSystem fs = tezScratchDir.getFileSystem(conf); fs.delete(tezScratchDir, true); tezScratchDir = null; } + protected final void cleanupDagResources() throws IOException { + FileSystem fs = resources.dagResourcesDir.getFileSystem(conf); + fs.delete(resources.dagResourcesDir, true); + resources = null; + } + public String getSessionId() { return sessionId; } @@ -675,10 +691,6 @@ public class TezSessionState { return session; } - public Path getTezScratchDir() { - return tezScratchDir; - } - public LocalResource getAppJarLr() { return appJarLr; } @@ -687,11 +699,11 @@ public class TezSessionState { * createTezDir creates a temporary directory in the scratchDir folder to * be used with Tez. Assumes scratchDir exists. */ - private Path createTezDir(String sessionId) throws IOException { + private Path createTezDir(String sessionId, String suffix) throws IOException { // tez needs its own scratch dir (per session) // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR); - tezDir = new Path(tezDir, sessionId); + tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix))); FileSystem fs = tezDir.getFileSystem(conf); FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)); fs.mkdirs(tezDir, fsPermission); @@ -759,9 +771,8 @@ public class TezSessionState { final File jar = new File(Utilities.jarFinderGetJar(clazz)); final String localJarPath = jar.toURI().toURL().toExternalForm(); - final LocalResource jarLr = - createJarLocalResource(localJarPath); - lrMap.put(utils.getBaseName(jarLr), jarLr); + final LocalResource jarLr = createJarLocalResource(localJarPath); + lrMap.put(DagUtils.getBaseName(jarLr), jarLr); } private String getSha(final Path localFile) throws IOException, IllegalArgumentException { @@ -808,7 +819,7 @@ public class TezSessionState { } public List<LocalResource> getLocalizedResources() { - return new ArrayList<>(localizedResources); + return new ArrayList<>(resources.localizedResources); } public String getUser() { @@ -849,10 +860,9 @@ public class TezSessionState { TezSessionPoolManager.getInstance().returnSession(this); } - public TezSessionState reopen( - Configuration conf, String[] inputOutputJars) throws Exception { + public TezSessionState reopen() throws Exception { // By default, TezSessionPoolManager handles this for both pool and non-pool session. - return TezSessionPoolManager.getInstance().reopen(this, conf, inputOutputJars); + return TezSessionPoolManager.getInstance().reopen(this); } public void destroy() throws Exception { @@ -875,4 +885,28 @@ public class TezSessionState { public KillQuery getKillQuery() { return killQuery; } + + public HiveResources extractHiveResources() { + HiveResources result = resources; + resources = null; + return result; + } + + public Path replaceHiveResources(HiveResources resources, boolean isAsync) { + Path dir = null; + if (this.resources != null) { + dir = this.resources.dagResourcesDir; + if (!isAsync) { + try { + dir.getFileSystem(conf).delete(dir, true); + } catch (Exception ex) { + LOG.error("Failed to delete the old resources directory " + + dir + "; ignoring " + ex.getLocalizedMessage()); + } + dir = null; + } + } + this.resources = resources; + return dir; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 8795cfc..27799a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hive.ql.exec.tez; +import org.apache.hive.common.util.Ref; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; - import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -31,9 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; - import javax.annotation.Nullable; - import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -67,7 +65,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClient; import org.apache.tez.common.counters.CounterGroup; @@ -87,7 +84,6 @@ import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; import org.json.JSONObject; - import com.google.common.annotations.VisibleForTesting; /** @@ -133,7 +129,7 @@ public class TezTask extends Task<TezWork> { int rc = 1; boolean cleanContext = false; Context ctx = null; - TezSessionState session = null; + Ref<TezSessionState> sessionRef = Ref.from(null); try { // Get or create Context object. If we create it we have to clean it later as well. @@ -147,15 +143,15 @@ public class TezTask extends Task<TezWork> { WmContext wmContext = new WmContext(System.currentTimeMillis(), queryId); ctx.setWmContext(wmContext); } - // Need to remove this static hack. But this is the way currently to get a session. SessionState ss = SessionState.get(); // Note: given that we return pool sessions to the pool in the finally block below, and that // we need to set the global to null to do that, this "reuse" may be pointless. - session = ss.getTezSession(); + TezSessionState session = sessionRef.value = ss.getTezSession(); if (session != null && !session.isOpen()) { LOG.warn("The session: " + session + " has not been opened"); } + // We only need a username for UGI to use for groups; getGroups will fetch the groups // based on Hadoop configuration, as documented at // https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html @@ -163,57 +159,51 @@ public class TezTask extends Task<TezWork> { MappingInput mi = (userName == null) ? new MappingInput("anonymous", null) : new MappingInput(ss.getUserName(), UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups()); + WmContext wmContext = ctx.getWmContext(); - session = WorkloadManagerFederation.getSession(session, conf, mi, getWork().getLlapMode(), wmContext); + // jobConf will hold all the configuration for hadoop, tez, and hive + JobConf jobConf = utils.createConfiguration(conf); + // Get all user jars from work (e.g. input format stuff). + String[] allNonConfFiles = work.configureJobConfAndExtractJars(jobConf); + // DAG scratch dir. We get a session from the pool so it may be different from Tez one. + // TODO: we could perhaps reuse the same directory for HiveResources? + Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), conf); + CallerContext callerContext = CallerContext.create( + "HIVE", queryPlan.getQueryId(), "HIVE_QUERY_ID", queryPlan.getQueryStr()); + + session = sessionRef.value = WorkloadManagerFederation.getSession( + sessionRef.value, conf, mi, getWork().getLlapMode(), wmContext); - LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(), wmContext.getQueryId()); - ss.setTezSession(session); try { - // jobConf will hold all the configuration for hadoop, tez, and hive - JobConf jobConf = utils.createConfiguration(conf); - - // Get all user jars from work (e.g. input format stuff). - String[] inputOutputJars = work.configureJobConfAndExtractJars(jobConf); - - // we will localize all the files (jars, plans, hashtables) to the - // scratch dir. let's create this and tmp first. - Path scratchDir = ctx.getMRScratchDir(); - - // create the tez tmp dir - scratchDir = utils.createTezDir(scratchDir, conf); + ss.setTezSession(session); + LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(), + wmContext.getQueryId()); - // This is used to compare global and vertex resources. Global resources are originally - // derived from session conf via localizeTempFilesFromConf. So, use that here. - Configuration sessionConf = (session.getConf() != null) ? session.getConf() : conf; - Map<String,LocalResource> inputOutputLocalResources = - getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf); + // Ensure the session is open and has the necessary local resources. + // This would refresh any conf resources and also local resources. + ensureSessionHasResources(session, allNonConfFiles); - // Ensure the session is open and has the necessary local resources - updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources); + // This is a combination of the jar stuff from conf, and not from conf. + List<LocalResource> allNonAppResources = session.getLocalizedResources(); + logResources(allNonAppResources); - List<LocalResource> additionalLr = session.getLocalizedResources(); - logResources(additionalLr); - - // unless already installed on all the cluster nodes, we'll have to - // localize hive-exec.jar as well. - LocalResource appJarLr = session.getAppJarLr(); + Map<String, LocalResource> allResources = DagUtils.createTezLrMap( + session.getAppJarLr(), allNonAppResources); // next we translate the TezWork to a Tez DAG - DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx); - CallerContext callerContext = CallerContext.create( - "HIVE", queryPlan.getQueryId(), - "HIVE_QUERY_ID", queryPlan.getQueryStr()); + DAG dag = build(jobConf, work, scratchDir, ctx, allResources); dag.setCallerContext(callerContext); - // Add the extra resources to the dag - addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources); + // Note: we no longer call addTaskLocalFiles because all the resources are correctly + // updated in the session resource lists now, and thus added to vertices. + // If something breaks, dag.addTaskLocalFiles might need to be called here. // Check isShutdown opportunistically; it's never unset. if (this.isShutdown) { throw new HiveException("Operation cancelled"); } - DAGClient dagClient = submit(jobConf, dag, scratchDir, appJarLr, session, - additionalLr, inputOutputJars, inputOutputLocalResources); + DAGClient dagClient = submit(jobConf, dag, sessionRef); + session = sessionRef.value; boolean wasShutdown = false; synchronized (dagClientLock) { assert this.dagClient == null; @@ -251,7 +241,9 @@ public class TezTask extends Task<TezWork> { // We return this to the pool even if it's unusable; reopen is supposed to handle this. wmContext = ctx.getWmContext(); try { - session.returnToSessionManager(); + if (sessionRef.value != null) { + sessionRef.value.returnToSessionManager(); + } } catch (Exception e) { LOG.error("Failed to return session: {} to pool", session, e); throw e; @@ -340,61 +332,23 @@ public class TezTask extends Task<TezWork> { } /** - * Converted the list of jars into local resources - */ - Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir, - String[] inputOutputJars, Configuration sessionConf) throws Exception { - final Map<String,LocalResource> resources = new HashMap<String,LocalResource>(); - // Skip the files already in session local resources... - final List<LocalResource> localResources = utils.localizeTempFiles(scratchDir.toString(), - jobConf, inputOutputJars, DagUtils.getTempFilesFromConf(sessionConf)); - if (null != localResources) { - for (LocalResource lr : localResources) { - resources.put(utils.getBaseName(lr), lr); - } - } - return resources; - } - - /** * Ensures that the Tez Session is open and the AM has all necessary jars configured. */ - void updateSession(TezSessionState session, - JobConf jobConf, Path scratchDir, String[] inputOutputJars, - Map<String,LocalResource> extraResources) throws Exception { - final boolean missingLocalResources = !session - .hasResources(inputOutputJars); - + @VisibleForTesting + void ensureSessionHasResources( + TezSessionState session, String[] nonConfResources) throws Exception { TezClient client = session.getSession(); // TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ? if (client == null) { + // Note: the only sane case where this can happen is the non-pool one. We should get rid + // of it, in non-pool case perf doesn't matter so we might as well open at get time + // and then call update like we do in the else. // Can happen if the user sets the tez flag after the session was established. LOG.info("Tez session hasn't been created yet. Opening session"); - session.open(inputOutputJars); + session.open(nonConfResources); } else { LOG.info("Session is already open"); - - // Ensure the open session has the necessary resources (StorageHandler) - if (missingLocalResources) { - LOG.info("Tez session missing resources," + - " adding additional necessary resources"); - client.addAppMasterLocalFiles(extraResources); - } - - session.refreshLocalResourcesFromConf(conf); - } - } - - /** - * Adds any necessary resources that must be localized in each vertex to the DAG. - */ - void addExtraResourcesToDag(TezSessionState session, DAG dag, - String[] inputOutputJars, - Map<String,LocalResource> inputOutputLocalResources) throws Exception { - if (!session.hasResources(inputOutputJars)) { - if (null != inputOutputLocalResources) { - dag.addTaskLocalFiles(inputOutputLocalResources); - } + session.ensureLocalResources(conf, nonConfResources); } } @@ -406,9 +360,8 @@ public class TezTask extends Task<TezWork> { } } - DAG build(JobConf conf, TezWork work, Path scratchDir, - LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx) - throws Exception { + DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx, + Map<String, LocalResource> vertexResources) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG); @@ -426,7 +379,7 @@ public class TezTask extends Task<TezWork> { DAG dag = DAG.create(dagName); // set some info for the query - JSONObject json = new JSONObject(new LinkedHashMap()).put("context", "Hive") + JSONObject json = new JSONObject(new LinkedHashMap<>()).put("context", "Hive") .put("description", ctx.getCmd()); String dagInfo = json.toString(); @@ -474,7 +427,6 @@ public class TezTask extends Task<TezWork> { // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner. // Pick any one source vertex to figure out the Edge configuration. - // now hook up the children for (BaseWork v: children) { @@ -488,9 +440,8 @@ public class TezTask extends Task<TezWork> { // Regular vertices JobConf wxConf = utils.initializeVertexConf(conf, ctx, w); checkOutputSpec(w, wxConf); - Vertex wx = - utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal, - work, work.getVertexType(w)); + Vertex wx = utils.createVertex(wxConf, w, scratchDir, fs, ctx, !isFinal, + work, work.getVertexType(w), vertexResources); if (w.getReservedMemoryMB() > 0) { // If reversedMemoryMB is set, make memory allocation fraction adjustment as needed double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf); @@ -548,38 +499,28 @@ public class TezTask extends Task<TezWork> { dag.setAccessControls(ac); } - DAGClient submit(JobConf conf, DAG dag, Path scratchDir, - LocalResource appJarLr, TezSessionState sessionState, - List<LocalResource> additionalLr, String[] inputOutputJars, - Map<String,LocalResource> inputOutputLocalResources) - throws Exception { + private TezSessionState getNewTezSessionOnError( + TezSessionState oldSession) throws Exception { + // Note: we don't pass the config to reopen. If the session was already open, it would + // have kept running with its current config - preserve that behavior. + TezSessionState newSession = oldSession.reopen(); + console.printInfo("Session re-established."); + return newSession; + } + + DAGClient submit(JobConf conf, DAG dag, Ref<TezSessionState> sessionStateRef) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); DAGClient dagClient = null; - - Map<String, LocalResource> resourceMap = new HashMap<String, LocalResource>(); - if (additionalLr != null) { - for (LocalResource lr: additionalLr) { - if (lr.getType() == LocalResourceType.FILE) { - // TEZ AM will only localize FILE (no script operators in the AM) - resourceMap.put(utils.getBaseName(lr), lr); - } - } - } - + TezSessionState sessionState = sessionStateRef.value; try { try { // ready to start execution on the cluster - sessionState.getSession().addAppMasterLocalFiles(resourceMap); dagClient = sessionState.getSession().submitDAG(dag); } catch (SessionNotRunning nr) { console.printInfo("Tez session was closed. Reopening..."); - - // close the old one, but keep the tmp files around - // conf is passed in only for the case when session conf is null (tests and legacy paths?) - sessionState = sessionState.reopen(conf, inputOutputJars); + sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState); console.printInfo("Session re-established."); - dagClient = sessionState.getSession().submitDAG(dag); } } catch (Exception e) { @@ -587,14 +528,12 @@ public class TezTask extends Task<TezWork> { try { console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: " + Arrays.toString(e.getStackTrace()) + " retrying..."); - // TODO: this is temporary, need to refactor how reopen is invoked. - WmContext oldCtx = sessionState.getWmContext(); - sessionState = sessionState.reopen(conf, inputOutputJars); - sessionState.setWmContext(oldCtx); + sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState); dagClient = sessionState.getSession().submitDAG(dag); } catch (Exception retryException) { // we failed to submit after retrying. Destroy session and bail. sessionState.destroy(); + sessionStateRef.value = null; throw retryException; } } http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index dbdbbf2..1f4843d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -17,6 +17,14 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,9 +47,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; - import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; @@ -50,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.api.WMPoolTrigger; import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.KillQuery; @@ -67,15 +75,6 @@ import org.codehaus.jackson.map.SerializationConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** Workload management entry point for HS2. * Note on how this class operates. @@ -342,6 +341,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida private List<WmTezSession> toRestartInUse = new LinkedList<>(), toDestroyNoRestart = new LinkedList<>(); private Map<WmTezSession, KillQueryContext> toKillQuery = new IdentityHashMap<>(); + private List<Path> pathsToDelete = Lists.newArrayList(); } private void runWmThread() { @@ -440,7 +440,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida try { WmEvent wmEvent = new WmEvent(WmEvent.EventType.RESTART); // Note: sessions in toRestart are always in use, so they cannot expire in parallel. - tezAmPool.replaceSession(toRestart, false, null); + tezAmPool.replaceSession(toRestart); wmEvent.endEvent(toRestart); } catch (Exception ex) { LOG.error("Failed to restart an old session; ignoring", ex); @@ -463,6 +463,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida }); } context.toDestroyNoRestart.clear(); + + // 4. Delete unneeded directories that were replaced by other ones via reopen. + for (final Path path : context.pathsToDelete) { + LOG.info("Deleting {}", path); + workPool.submit(() -> { + try { + path.getFileSystem(conf).delete(path, true); + } catch (Exception ex) { + LOG.error("Failed to delete an old path; ignoring " + ex.getMessage()); + } + }); + } + context.pathsToDelete.clear(); } /** @@ -654,7 +667,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida if (LOG.isDebugEnabled()) { LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName)); } - processPoolChangesOnMasterThread(poolName, hasRequeues); + processPoolChangesOnMasterThread(poolName, hasRequeues, syncWork); } @@ -852,7 +865,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida case OK: // If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK. PoolState pool = pools.get(poolName); - SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(), session.getWmContext()); + SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(), + session.getWmContext(), session.extractHiveResources()); // We have just removed the session from the same pool, so don't check concurrency here. pool.initializingSessions.add(sw); ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync(); @@ -953,7 +967,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida state = new PoolState(fullName, qp, fraction); } else { // This will also take care of the queries if query parallelism changed. - state.update(qp, fraction, syncWork.toKillQuery, e); + state.update(qp, fraction, syncWork, e); poolsToRedistribute.add(fullName); } state.setTriggers(new LinkedList<Trigger>()); @@ -988,7 +1002,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida if (oldPools != null && !oldPools.isEmpty()) { // Looks like some pools were removed; kill running queries, re-queue the queued ones. for (PoolState oldPool : oldPools.values()) { - oldPool.destroy(syncWork.toKillQuery, e.getRequests, e.toReuse); + oldPool.destroy(syncWork, e.getRequests, e.toReuse); } } @@ -1027,7 +1041,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida return deltaSessions + toTransfer; } - @SuppressWarnings("unchecked") private void failOnFutureFailure(ListenableFuture<?> future) { Futures.addCallback(future, FATAL_ERROR_CALLBACK); } @@ -1088,7 +1101,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } - private void processPoolChangesOnMasterThread(String poolName, boolean hasRequeues) throws Exception { + private void processPoolChangesOnMasterThread( + String poolName, boolean hasRequeues, WmThreadSyncWork syncWork) throws Exception { PoolState pool = pools.get(poolName); if (pool == null) return; // Might be from before the new resource plan. @@ -1109,15 +1123,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // Note that in theory, we are guaranteed to have a session waiting for us here, but // the expiration, failures, etc. may cause one to be missing pending restart. // See SessionInitContext javadoc. - SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId, - queueReq.wmContext); + SessionInitContext sw = new SessionInitContext( + queueReq.future, poolName, queueReq.queryId, queueReq.wmContext, null); ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync(); Futures.addCallback(getFuture, sw); // It is possible that all the async methods returned on the same thread because the // session with registry data and stuff was available in the pool. // If this happens, we'll take the session out here and "cancel" the init so we skip // processing the message that the successful init has queued for us. - boolean isDone = sw.extractSessionAndCancelIfDone(pool.sessions); + boolean isDone = sw.extractSessionAndCancelIfDone(pool.sessions, syncWork.pathsToDelete); if (!isDone) { pool.initializingSessions.add(sw); } @@ -1458,22 +1472,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida @Override - public TezSessionState reopen(TezSessionState session, Configuration conf, - String[] additionalFiles) throws Exception { + public TezSessionState reopen(TezSessionState session) throws Exception { WmTezSession wmTezSession = ensureOwnedSession(session); HiveConf sessionConf = wmTezSession.getConf(); if (sessionConf == null) { + // TODO: can this ever happen? LOG.warn("Session configuration is null for " + wmTezSession); sessionConf = new HiveConf(conf, WorkloadManager.class); } - // TODO: ideally, we should handle reopen the same way no matter what. However, the cases - // with additional files will have to wait until HIVE-17827 is unfucked, because it's - // difficult to determine how the additionalFiles are to be propagated/reused between - // two sessions. Once the update logic is encapsulated in the session we can remove this. - if (additionalFiles != null && additionalFiles.length > 0) { - TezSessionPoolManager.reopenInternal(session, additionalFiles); - return session; - } SettableFuture<WmTezSession> future = SettableFuture.create(); currentLock.lock(); @@ -1493,7 +1499,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida public void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception { // By definition, this session is not in use and can no longer be in use, so it only // affects the session pool. We can handle this inline. - tezAmPool.replaceSession(ensureOwnedSession(session), false, null); + tezAmPool.replaceSession(ensureOwnedSession(session)); } // ======= VARIOUS UTILITY METHOD @@ -1637,7 +1643,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } public void update(int queryParallelism, double fraction, - Map<WmTezSession, KillQueryContext> toKill, EventState e) { + WmThreadSyncWork syncWork, EventState e) { this.finalFraction = this.finalFractionRemaining = fraction; this.queryParallelism = queryParallelism; // TODO: two possible improvements @@ -1646,7 +1652,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // If we could somehow restart queries we could instead put them at the front // of the queue (esp. in conjunction with (1)) and rerun them. if (queryParallelism < getTotalActiveSessions()) { - extractAllSessionsToKill("The query pool was resized by administrator", e.toReuse, toKill); + extractAllSessionsToKill("The query pool was resized by administrator", + e.toReuse, syncWork); } // We will requeue, and not kill, the queries that are not running yet. // Insert them all before the get requests from this iteration. @@ -1656,9 +1663,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } } - public void destroy(Map<WmTezSession, KillQueryContext> toKill, + public void destroy(WmThreadSyncWork syncWork, LinkedList<GetRequest> globalQueue, IdentityHashMap<WmTezSession, GetRequest> toReuse) { - extractAllSessionsToKill("The query pool was removed by administrator", toReuse, toKill); + extractAllSessionsToKill("The query pool was removed by administrator", toReuse, syncWork); // All the pending get requests should just be requeued elsewhere. // Note that we never queue session reuse so sessionToReuse would be null. globalQueue.addAll(0, queue); @@ -1694,19 +1701,22 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida private void extractAllSessionsToKill(String killReason, IdentityHashMap<WmTezSession, GetRequest> toReuse, - Map<WmTezSession, KillQueryContext> toKill) { + WmThreadSyncWork syncWork) { for (WmTezSession sessionToKill : sessions) { - resetRemovedSessionToKill(toKill, new KillQueryContext(sessionToKill, killReason), toReuse); + resetRemovedSessionToKill(syncWork.toKillQuery, + new KillQueryContext(sessionToKill, killReason), toReuse); } sessions.clear(); for (SessionInitContext initCtx : initializingSessions) { // It is possible that the background init thread has finished in parallel, queued // the message for us but also returned the session to the user. - WmTezSession sessionToKill = initCtx.cancelAndExtractSessionIfDone(killReason); + WmTezSession sessionToKill = initCtx.cancelAndExtractSessionIfDone( + killReason, syncWork.pathsToDelete); if (sessionToKill == null) { continue; // Async op in progress; the callback will take care of this. } - resetRemovedSessionToKill(toKill, new KillQueryContext(sessionToKill, killReason), toReuse); + resetRemovedSessionToKill(syncWork.toKillQuery, + new KillQueryContext(sessionToKill, killReason), toReuse); } initializingSessions.clear(); } @@ -1740,14 +1750,18 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida private SettableFuture<WmTezSession> future; private SessionInitState state; private String cancelReason; + private HiveResources prelocalizedResources; + private Path pathToDelete; private WmContext wmContext; - public SessionInitContext(SettableFuture<WmTezSession> future, String poolName, String queryId, - final WmContext wmContext) { + public SessionInitContext(SettableFuture<WmTezSession> future, + String poolName, String queryId, WmContext wmContext, + HiveResources prelocalizedResources) { this.state = SessionInitState.GETTING; this.future = future; this.poolName = poolName; this.queryId = queryId; + this.prelocalizedResources = prelocalizedResources; this.wmContext = wmContext; } @@ -1765,7 +1779,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida session.setPoolName(poolName); session.setQueueName(yarnQueue); session.setQueryId(queryId); - session.setWmContext(wmContext); + if (prelocalizedResources != null) { + pathToDelete = session.replaceHiveResources(prelocalizedResources, true); + } + if (wmContext != null) { + session.setWmContext(wmContext); + } this.session = session; this.state = SessionInitState.WAITING_FOR_REGISTRY; break; @@ -1855,7 +1874,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida session.setQueryId(null); // We can just restart the session if we have received one. try { - tezAmPool.replaceSession(session, false, null); + tezAmPool.replaceSession(session); } catch (Exception e) { LOG.error("Failed to restart a failed session", e); } @@ -1863,7 +1882,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } /** Cancel the async operation (even if it's done), and return the session if done. */ - public WmTezSession cancelAndExtractSessionIfDone(String cancelReason) { + public WmTezSession cancelAndExtractSessionIfDone(String cancelReason, List<Path> toDelete) { lock.lock(); try { SessionInitState state = this.state; @@ -1872,6 +1891,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida if (state == SessionInitState.DONE) { WmTezSession result = this.session; this.session = null; + if (pathToDelete != null) { + toDelete.add(pathToDelete); + } return result; } else { // In the states where a background operation is in progress, wait for the callback. @@ -1887,11 +1909,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } /** Extracts the session and cancel the operation, both only if done. */ - public boolean extractSessionAndCancelIfDone(List<WmTezSession> results) { + public boolean extractSessionAndCancelIfDone( + List<WmTezSession> results, List<Path> toDelete) { lock.lock(); try { if (state != SessionInitState.DONE) return false; this.state = SessionInitState.CANCELED; + if (pathToDelete != null) { + toDelete.add(pathToDelete); + } if (this.session != null) { results.add(this.session); } // Otherwise we have failed; the callback has taken care of the failure. http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 9726af1..5ade1f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -92,7 +92,7 @@ public class TezJobMonitor { try { // TODO: why does this only kill non-default sessions? // Nothing for workload management since that only deals with default ones. - TezSessionPoolManager.getInstance().closeNonDefaultSessions(false); + TezSessionPoolManager.getInstance().closeNonDefaultSessions(); } catch (Exception e) { // ignore } http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 4148a8a..d6ae171 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -356,9 +356,8 @@ public class GenericUDTFGetSplits extends GenericUDTF { // Update the queryId to use the generated applicationId. See comment below about // why this is done. HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString()); - Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr, - new ArrayList<LocalResource>(), fs, ctx, false, work, - work.getVertexType(mapWork)); + Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, fs, ctx, false, work, + work.getVertexType(mapWork), DagUtils.createTezLrMap(appJarLr, null)); String vertexName = wx.getName(); dag.addVertex(wx); utils.addCredentials(mapWork, dag); http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java index 5248454..0a47cda 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java @@ -20,16 +20,12 @@ package org.apache.hadoop.hive.ql.exec.tez; import com.google.common.util.concurrent.Futures; - import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; -import java.util.Collection; import java.util.concurrent.ScheduledExecutorService; import javax.security.auth.login.LoginException; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -83,8 +79,7 @@ public class SampleTezSessionState extends WmTezSession { } @Override - public void open(Collection<String> additionalFiles, Path scratchDir) - throws LoginException, IOException { + public void open(HiveResources resources) throws LoginException, IOException { open(); } http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index 829ea8c..8fbe9a7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -20,20 +20,16 @@ package org.apache.hadoop.hive.ql.exec.tez; import static org.junit.Assert.*; -import java.util.HashSet; -import java.util.Set; - import java.util.ArrayList; import java.util.List; import java.util.Random; - +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; public class TestTezSessionPool { @@ -190,22 +186,22 @@ public class TestTezSessionPool { Mockito.when(session.isDefault()).thenReturn(false); Mockito.when(session.getConf()).thenReturn(conf); - poolManager.reopen(session, conf, null); + poolManager.reopen(session); - Mockito.verify(session).close(true); - Mockito.verify(session).open(new HashSet<String>(), null); + Mockito.verify(session).close(false); + Mockito.verify(session).open(Mockito.<TezSessionState.HiveResources>any()); // mocked session starts with default queue assertEquals("default", session.getQueueName()); // user explicitly specified queue name conf.set("tez.queue.name", "tezq1"); - poolManager.reopen(session, conf, null); + poolManager.reopen(session); assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); // user unsets queue name, will fallback to default session queue conf.unset("tez.queue.name"); - poolManager.reopen(session, conf, null); + poolManager.reopen(session); assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName()); // session.open will unset the queue name from conf but Mockito intercepts the open call @@ -213,17 +209,17 @@ public class TestTezSessionPool { conf.unset("tez.queue.name"); // change session's default queue to tezq1 and rerun test sequence Mockito.when(session.getQueueName()).thenReturn("tezq1"); - poolManager.reopen(session, conf, null); + poolManager.reopen(session); assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); // user sets default queue now conf.set("tez.queue.name", "default"); - poolManager.reopen(session, conf, null); + poolManager.reopen(session); assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName()); // user does not specify queue so use session default conf.unset("tez.queue.name"); - poolManager.reopen(session, conf, null); + poolManager.reopen(session); assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); } catch (Exception e) { e.printStackTrace(); @@ -328,10 +324,10 @@ public class TestTezSessionPool { Mockito.when(session.isDefault()).thenReturn(false); Mockito.when(session.getConf()).thenReturn(conf); - poolManager.reopen(session, conf, null); + poolManager.reopen(session); - Mockito.verify(session).close(true); - Mockito.verify(session).open(new HashSet<String>(), null); + Mockito.verify(session).close(false); + Mockito.verify(session).open(Mockito.<TezSessionState.HiveResources>any()); } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 47aa936..44d2b66 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -23,16 +23,16 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hive.common.util.Ref; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -96,8 +96,8 @@ public class TestTezTask { when(utils.getTezDir(any(Path.class))).thenReturn(path); when( utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class), - any(LocalResource.class), any(List.class), any(FileSystem.class), any(Context.class), - anyBoolean(), any(TezWork.class), any(VertexType.class))).thenAnswer( + any(FileSystem.class), any(Context.class), + anyBoolean(), any(TezWork.class), any(VertexType.class), any(Map.class))).thenAnswer( new Answer<Vertex>() { @Override @@ -163,7 +163,7 @@ public class TestTezTask { task.setQueryPlan(mockQueryPlan); conf = new JobConf(); - appLr = mock(LocalResource.class); + appLr = createResource("foo.jar"); HiveConf hiveConf = new HiveConf(); hiveConf @@ -173,8 +173,7 @@ public class TestTezTask { session = mock(TezClient.class); sessionState = mock(TezSessionState.class); when(sessionState.getSession()).thenReturn(session); - when(sessionState.reopen(any(Configuration.class), any(String[].class))) - .thenReturn(sessionState); + when(sessionState.reopen()).thenReturn(sessionState); when(session.submitDAG(any(DAG.class))) .thenThrow(new SessionNotRunning("")) .thenReturn(mock(DAGClient.class)); @@ -192,7 +191,8 @@ public class TestTezTask { @Test public void testBuildDag() throws IllegalArgumentException, IOException, Exception { - DAG dag = task.build(conf, work, path, appLr, null, new Context(conf)); + DAG dag = task.build(conf, work, path, new Context(conf), + DagUtils.createTezLrMap(appLr, null)); for (BaseWork w: work.getAllWork()) { Vertex v = dag.getVertex(w.getName()); assertNotNull(v); @@ -212,17 +212,17 @@ public class TestTezTask { @Test public void testEmptyWork() throws IllegalArgumentException, IOException, Exception { - DAG dag = task.build(conf, new TezWork("", null), path, appLr, null, new Context(conf)); + DAG dag = task.build(conf, new TezWork("", null), path, new Context(conf), + DagUtils.createTezLrMap(appLr, null)); assertEquals(dag.getVertices().size(), 0); } @Test public void testSubmit() throws Exception { DAG dag = DAG.create("test"); - task.submit(conf, dag, path, appLr, sessionState, Collections.<LocalResource> emptyList(), - new String[0], Collections.<String,LocalResource> emptyMap()); + task.submit(conf, dag, Ref.from(sessionState)); // validate close/reopen - verify(sessionState, times(1)).reopen(any(Configuration.class), any(String[].class)); + verify(sessionState, times(1)).reopen(); verify(session, times(2)).submitDAG(any(DAG.class)); } @@ -235,53 +235,22 @@ public class TestTezTask { @Test public void testExistingSessionGetsStorageHandlerResources() throws Exception { final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; - LocalResource res = mock(LocalResource.class); - final List<LocalResource> resources = Collections.singletonList(res); - final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>(); - resMap.put("foo.jar", res); - - when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null)) - .thenReturn(resources); - when(utils.getBaseName(res)).thenReturn("foo.jar"); - when(sessionState.isOpen()).thenReturn(true); - when(sessionState.isOpening()).thenReturn(false); - when(sessionState.hasResources(inputOutputJars)).thenReturn(false); - task.updateSession(sessionState, conf, path, inputOutputJars, resMap); - verify(session).addAppMasterLocalFiles(resMap); - } - - @Test - public void testExtraResourcesAddedToDag() throws Exception { - final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; - LocalResource res = mock(LocalResource.class); + LocalResource res = createResource(inputOutputJars[0]); final List<LocalResource> resources = Collections.singletonList(res); - final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>(); - resMap.put("foo.jar", res); - DAG dag = mock(DAG.class); - when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null)) - .thenReturn(resources); - when(utils.getBaseName(res)).thenReturn("foo.jar"); + when(utils.localizeTempFiles(anyString(), any(Configuration.class), eq(inputOutputJars), + any(String[].class))).thenReturn(resources); when(sessionState.isOpen()).thenReturn(true); when(sessionState.isOpening()).thenReturn(false); - when(sessionState.hasResources(inputOutputJars)).thenReturn(false); - task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap); - verify(dag).addTaskLocalFiles(resMap); + task.ensureSessionHasResources(sessionState, inputOutputJars); + // TODO: ideally we should have a test for session itself. + verify(sessionState).ensureLocalResources(any(Configuration.class), eq(inputOutputJars)); } - @Test - public void testGetExtraLocalResources() throws Exception { - final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; + private static LocalResource createResource(String url) { LocalResource res = mock(LocalResource.class); - final List<LocalResource> resources = Collections.singletonList(res); - final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>(); - resMap.put("foo.jar", res); - - when(utils.localizeTempFiles(eq(path.toString()), eq(conf), eq(inputOutputJars), - Mockito.<String[]>any())).thenReturn(resources); - when(utils.getBaseName(res)).thenReturn("foo.jar"); - - assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars, null)); + when(res.getResource()).thenReturn(URL.fromPath(new Path(url))); + return res; } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index c58e450..fc8f66a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -42,7 +42,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; @@ -212,9 +211,8 @@ public class TestWorkloadManager { } @Override - public TezSessionState reopen( - TezSessionState session, Configuration conf, String[] additionalFiles) throws Exception { - session = super.reopen(session, conf, additionalFiles); + public TezSessionState reopen(TezSessionState session) throws Exception { + session = super.reopen(session); ensureWm(); return session; } @@ -274,7 +272,7 @@ public class TestWorkloadManager { null, new MappingInput("user", null), conf, null); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); - WmTezSession session2 = (WmTezSession) session.reopen(conf, null); + WmTezSession session2 = (WmTezSession) session.reopen(); assertNotSame(session, session2); wm.addTestEvent().get(); assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON); @@ -682,7 +680,7 @@ public class TestWorkloadManager { waitForThreadToBlock(cdl1, t1); checkError(error); // Replacing it directly in the pool should unblock get. - pool.replaceSession(oob, false, null); + pool.replaceSession(oob); t1.join(); assertNotNull(sessionA1.get()); assertEquals("A", sessionA1.get().getPoolName());
