HIVE-11969 : start Tez session in background when starting CLI (Sergey Shelukhin, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7f9023ea Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7f9023ea Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7f9023ea Branch: refs/heads/llap Commit: 7f9023ea0323821626f17e30d04b5acffb1d3048 Parents: 556877c Author: Sergey Shelukhin <[email protected]> Authored: Wed Oct 7 13:55:36 2015 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Oct 7 13:55:36 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/cli/CliDriver.java | 9 +- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../hive/ql/exec/tez/TezSessionState.java | 204 +++++++++++++++---- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 6 +- .../hadoop/hive/ql/session/SessionState.java | 52 +++-- .../hadoop/hive/ql/exec/tez/TestTezTask.java | 2 + 6 files changed, 222 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java ---------------------------------------------------------------------- diff --git a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java index 4b52578..3a80f99 100644 --- a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java +++ b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.Validator; import org.apache.hadoop.hive.conf.VariableSubstitution; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; @@ -690,7 +691,13 @@ public class CliDriver { }).substitute(conf, prompt); prompt2 = spacesForString(prompt); - SessionState.start(ss); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CLI_TEZ_SESSION_ASYNC)) { + // Start the session in a fire-and-forget manner. When the asynchronously initialized parts of + // the session are needed, the corresponding getters and other methods will wait as needed. + SessionState.beginStart(ss, console); + } else { + SessionState.start(ss); + } // execute cli driver work try { http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 54a529e..bf48f69 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1712,6 +1712,9 @@ public class HiveConf extends Configuration { HIVE_CLI_PRINT_HEADER("hive.cli.print.header", false, "Whether to print the names of the columns in query output."), + HIVE_CLI_TEZ_SESSION_ASYNC("hive.cli.tez.session.async", true, "Whether to start Tez\n" + + "session in background when running CLI with Tez, allowing CLI to be available earlier."), + HIVE_ERROR_ON_EMPTY_PARTITION("hive.error.on.empty.partition", false, "Whether to throw an exception if dynamic partition insert generates empty results."), http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 568ebbe..6ed6421 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -23,14 +23,19 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.security.auth.login.LoginException; @@ -46,7 +51,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -70,6 +75,9 @@ public class TezSessionState { private Path tezScratchDir; private LocalResource appJarLr; private TezClient session; + private Future<TezClient> sessionFuture; + /** Console used for user feedback during async session opening. */ + private LogHelper console; private String sessionId; private final DagUtils utils; private String queueName; @@ -97,13 +105,40 @@ public class TezSessionState { this.sessionId = sessionId; } - /** - * Returns whether a session has been established - */ + public boolean isOpening() { + if (session != null || sessionFuture == null) return false; + try { + session = sessionFuture.get(0, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (CancellationException e) { + return false; + } catch (TimeoutException e) { + return true; + } + return false; + } + public boolean isOpen() { - return session != null; + if (session != null) return true; + if (sessionFuture == null) return false; + try { + session = sessionFuture.get(0, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException | CancellationException e) { + return false; + } + return true; } + /** * Get all open sessions. Only used to clean up at shutdown. * @return List<TezSessionState> @@ -124,9 +159,21 @@ public class TezSessionState { * @throws URISyntaxException * @throws LoginException * @throws TezException + * @throws InterruptedException */ public void open(HiveConf conf, String[] additionalFiles) throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { + openInternal(conf, additionalFiles, false, null); + } + + public void beginOpen(HiveConf conf, String[] additionalFiles, LogHelper console) + throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { + openInternal(conf, additionalFiles, true, console); + } + + private void openInternal( + final HiveConf conf, String[] additionalFiles, boolean isAsync, LogHelper console) + throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { this.conf = conf; this.queueName = conf.get("tez.queue.name"); this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); @@ -152,7 +199,7 @@ public class TezSessionState { appJarLr = createJarLocalResource(utils.getExecJarPathLocal()); // configuration for the application master - Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>(); + final Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>(); commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr); for (LocalResource lr : localizedResources) { commonLocalResources.put(utils.getBaseName(lr), lr); @@ -164,7 +211,7 @@ public class TezSessionState { // and finally we're ready to create and start the session // generate basic tez config - TezConfiguration tezConfig = new TezConfiguration(conf); + final TezConfiguration tezConfig = new TezConfiguration(conf); tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); Utilities.stripHivePasswordDetails(tezConfig); @@ -176,37 +223,85 @@ public class TezSessionState { tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n); } - session = TezClient.create("HIVE-" + sessionId, tezConfig, true, + final TezClient session = TezClient.create("HIVE-" + sessionId, tezConfig, true, commonLocalResources, null); LOG.info("Opening new Tez Session (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")"); TezJobMonitor.initShutdownHook(); - session.start(); + if (!isAsync) { + startSessionAndContainers(session, conf, commonLocalResources, tezConfig, false); + this.session = session; + } else { + FutureTask<TezClient> sessionFuture = new FutureTask<>(new Callable<TezClient>() { + @Override + public TezClient call() throws Exception { + return startSessionAndContainers(session, conf, commonLocalResources, tezConfig, true); + } + }); + new Thread(sessionFuture, "Tez session start thread").start(); + // We assume here nobody will try to get session before open() returns. + this.console = console; + this.sessionFuture = sessionFuture; + } + } - if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) { - int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS); - LOG.info("Prewarming " + n + " containers (id: " + sessionId - + ", scratch dir: " + tezScratchDir + ")"); - PreWarmVertex prewarmVertex = utils.createPreWarmVertex(tezConfig, n, - commonLocalResources); - try { - session.preWarm(prewarmVertex); - } catch (IOException ie) { - if (ie.getMessage().contains("Interrupted while waiting")) { - if (LOG.isDebugEnabled()) { - LOG.debug("Hive Prewarm threw an exception ", ie); + private TezClient startSessionAndContainers(TezClient session, HiveConf conf, + Map<String, LocalResource> commonLocalResources, TezConfiguration tezConfig, + boolean isOnThread) throws TezException, IOException { + session.start(); + boolean isSuccessful = false; + try { + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) { + int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS); + LOG.info("Prewarming " + n + " containers (id: " + sessionId + + ", scratch dir: " + tezScratchDir + ")"); + PreWarmVertex prewarmVertex = utils.createPreWarmVertex( + tezConfig, n, commonLocalResources); + try { + session.preWarm(prewarmVertex); + } catch (IOException ie) { + if (!isOnThread && ie.getMessage().contains("Interrupted while waiting")) { + if (LOG.isDebugEnabled()) { + LOG.debug("Hive Prewarm threw an exception ", ie); + } + } else { + throw ie; } - } else { - throw ie; } } + try { + session.waitTillReady(); + } catch (InterruptedException ie) { + if (isOnThread) throw new IOException(ie); + //ignore + } + isSuccessful = true; + return session; + } finally { + if (isOnThread && !isSuccessful) { + closeAndIgnoreExceptions(session); + } } + } + + private static void closeAndIgnoreExceptions(TezClient session) { + try { + session.stop(); + } catch (SessionNotRunning nr) { + // Ignore. + } catch (IOException | TezException ex) { + LOG.info("Failed to close Tez session after failure to initialize: " + ex.getMessage()); + } + } + + public void endOpen() throws InterruptedException, CancellationException { + if (this.session != null || this.sessionFuture == null) return; try { - session.waitTillReady(); - } catch(InterruptedException ie) { - //ignore + this.session = this.sessionFuture.get(); + } catch (ExecutionException e) { + throw new RuntimeException(e); } } @@ -250,21 +345,32 @@ public class TezSessionState { * @throws Exception */ public void close(boolean keepTmpDir) throws Exception { - if (!isOpen()) { - return; - } - - LOG.info("Closing Tez Session"); - try { - session.stop(); - } catch (SessionNotRunning nr) { - // ignore + if (session != null) { + LOG.info("Closing Tez Session"); + closeClient(session); + } else if (sessionFuture != null) { + sessionFuture.cancel(true); + TezClient asyncSession = null; + try { + asyncSession = sessionFuture.get(); // In case it was done and noone looked at it. + } catch (ExecutionException | CancellationException e) { + // ignore + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // ignore + } + if (asyncSession != null) { + LOG.info("Closing Tez Session"); + closeClient(asyncSession); + } } if (!keepTmpDir) { cleanupScratchDir(); } session = null; + sessionFuture = null; + console = null; tezScratchDir = null; conf = null; appJarLr = null; @@ -272,6 +378,15 @@ public class TezSessionState { localizedResources.clear(); } + private void closeClient(TezClient client) throws TezException, + IOException { + try { + client.stop(); + } catch (SessionNotRunning nr) { + // ignore + } + } + public void cleanupScratchDir () throws IOException { FileSystem fs = tezScratchDir.getFileSystem(conf); fs.delete(tezScratchDir, true); @@ -283,6 +398,21 @@ public class TezSessionState { } public TezClient getSession() { + if (session == null && sessionFuture != null) { + if (!sessionFuture.isDone()) { + console.printInfo("Waiting for Tez session and AM to be ready..."); + } + try { + session = sessionFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (CancellationException e) { + return null; + } + } return session; } http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 2d740ed..c62e929 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.tez.client.TezClient; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; @@ -251,7 +252,8 @@ public class TezTask extends Task<TezWork> { final boolean missingLocalResources = !session .hasResources(inputOutputJars); - if (!session.isOpen()) { + TezClient client = session.getSession(); + if (client == null) { // can happen if the user sets the tez flag after the session was // established LOG.info("Tez session hasn't been created yet. Opening session"); @@ -263,7 +265,7 @@ public class TezTask extends Task<TezWork> { if (missingLocalResources) { LOG.info("Tez session missing resources," + " adding additional necessary resources"); - session.getSession().addAppMasterLocalFiles(extraResources); + client.addAppMasterLocalFiles(extraResources); } session.refreshLocalResourcesFromConf(conf); http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index dc8c336..56b0fae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CancellationException; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.io.FileUtils; @@ -85,7 +86,6 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Shell; import com.google.common.base.Preconditions; @@ -474,6 +474,21 @@ public class SessionState { * when switching from one session to another. */ public static SessionState start(SessionState startSs) { + start(startSs, false, null); + return startSs; + } + + public static void beginStart(SessionState startSs, LogHelper console) { + start(startSs, true, console); + } + + public static void endStart(SessionState startSs) + throws CancellationException, InterruptedException { + if (startSs.tezSessionState == null) return; + startSs.tezSessionState.endOpen(); + } + + private static void start(SessionState startSs, boolean isAsync, LogHelper console) { setCurrentSessionState(startSs); if (startSs.hiveHist == null){ @@ -521,20 +536,31 @@ public class SessionState { throw new RuntimeException(e); } - if (HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE) - .equals("tez") && (startSs.isHiveServerQuery == false)) { - try { - if (startSs.tezSessionState == null) { - startSs.tezSessionState = new TezSessionState(startSs.getSessionId()); - } - if (!startSs.tezSessionState.isOpen()) { - startSs.tezSessionState.open(startSs.conf); // should use conf on session start-up + String engine = HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + if (!engine.equals("tez") || startSs.isHiveServerQuery) return; + + try { + if (startSs.tezSessionState == null) { + startSs.tezSessionState = new TezSessionState(startSs.getSessionId()); + } + if (startSs.tezSessionState.isOpen()) { + return; + } + if (startSs.tezSessionState.isOpening()) { + if (!isAsync) { + startSs.tezSessionState.endOpen(); } - } catch (Exception e) { - throw new RuntimeException(e); + return; + } + // Neither open nor opening. + if (!isAsync) { + startSs.tezSessionState.open(startSs.conf); // should use conf on session start-up + } else { + startSs.tezSessionState.beginOpen(startSs.conf, null, console); } + } catch (Exception e) { + throw new RuntimeException(e); } - return startSs; } /** @@ -1572,8 +1598,6 @@ public class SessionState { } } - - public TezSessionState getTezSession() { return tezSessionState; } http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index d004a27..858cca0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -236,6 +236,7 @@ public class TestTezTask { .thenReturn(resources); when(utils.getBaseName(res)).thenReturn("foo.jar"); when(sessionState.isOpen()).thenReturn(true); + when(sessionState.isOpening()).thenReturn(false); when(sessionState.hasResources(inputOutputJars)).thenReturn(false); task.updateSession(sessionState, conf, path, inputOutputJars, resMap); verify(session).addAppMasterLocalFiles(resMap); @@ -254,6 +255,7 @@ public class TestTezTask { .thenReturn(resources); when(utils.getBaseName(res)).thenReturn("foo.jar"); when(sessionState.isOpen()).thenReturn(true); + when(sessionState.isOpening()).thenReturn(false); when(sessionState.hasResources(inputOutputJars)).thenReturn(false); task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap); verify(dag).addTaskLocalFiles(resMap);
