HIVE-19777 : NPE in TezSessionState (Sergey Shelukhin, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7c32fce8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7c32fce8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7c32fce8 Branch: refs/heads/branch-3 Commit: 7c32fce80694e19bef6395947e4db1d5cb8171e0 Parents: 6c329a2 Author: sergey <ser...@apache.org> Authored: Mon Jun 18 15:15:38 2018 -0700 Committer: sergey <ser...@apache.org> Committed: Mon Jun 18 15:16:04 2018 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/tez/TezSessionState.java | 33 +++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7c32fce8/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index fe139c9..08e65a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -180,7 +180,11 @@ public class TezSessionState { return false; } try { - session = sessionFuture.get(0, TimeUnit.NANOSECONDS); + TezClient session = sessionFuture.get(0, TimeUnit.NANOSECONDS); + if (session == null) { + return false; + } + this.session = session; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; @@ -202,7 +206,11 @@ public class TezSessionState { return false; } try { - session = sessionFuture.get(0, TimeUnit.NANOSECONDS); + TezClient session = sessionFuture.get(0, TimeUnit.NANOSECONDS); + if (session == null) { + return false; + } + this.session = session; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; @@ -363,12 +371,23 @@ public class TezSessionState { FutureTask<TezClient> sessionFuture = new FutureTask<>(new Callable<TezClient>() { @Override public TezClient call() throws Exception { + TezClient result = null; try { - return startSessionAndContainers(session, conf, commonLocalResources, tezConfig, true); + result = startSessionAndContainers( + session, conf, commonLocalResources, tezConfig, true); } catch (Throwable t) { + // The caller has already stopped the session. LOG.error("Failed to start Tez session", t); throw (t instanceof Exception) ? (Exception)t : new Exception(t); } + // Check interrupt at the last moment in case we get cancelled quickly. + // This is not bulletproof but should allow us to close session in most cases. + if (Thread.interrupted()) { + LOG.info("Interrupted while starting Tez session"); + closeAndIgnoreExceptions(result); + return null; + } + return result; } }); new Thread(sessionFuture, "Tez session start thread").start(); @@ -471,7 +490,11 @@ public class TezSessionState { return; } try { - this.session = this.sessionFuture.get(); + TezClient session = this.sessionFuture.get(); + if (session == null) { + throw new RuntimeException("Initialization was interrupted"); + } + this.session = session; } catch (ExecutionException e) { throw new RuntimeException(e); } @@ -645,7 +668,7 @@ public class TezSessionState { appJarLr = null; try { - if (getSession() != null) { + if (session != null) { LOG.info("Closing Tez Session"); closeClient(session); session = null;