Repository: hive
Updated Branches:
  refs/heads/master-tez092 [created] 8151911b4


http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 5326e35..f575975 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -63,7 +63,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool;
 import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
 import org.apache.hadoop.hive.metastore.api.WMTrigger;
 import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
-import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources;
+import org.apache.hadoop.hive.ql.exec.tez.TezSession.HiveResources;
 import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
 import org.apache.hadoop.hive.ql.exec.tez.WmEvent.EventType;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -96,7 +96,7 @@ import org.slf4j.LoggerFactory;
  * none of that state can be accessed directly - most changes that touch pool 
state, or interact
  * with background operations like init, need to go thru eventstate; see e.g. 
returnAfterUse.
  */
-public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValidator
+public class WorkloadManager extends AbstractTriggerValidator
   implements TezSessionPoolSession.Manager, 
SessionExpirationTracker.RestartImpl,
              WorkloadManagerMxBean {
   private static final Logger LOG = 
LoggerFactory.getLogger(WorkloadManager.class);
@@ -114,6 +114,8 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   private final int amRegistryTimeoutMs;
   private final boolean allowAnyPool;
   private final MetricsSystem metricsSystem;
+  private final TezExternalSessionsRegistryClient externalSessions;
+
   // Note: it's not clear that we need to track this - unlike PoolManager we 
don't have non-pool
   //       sessions, so the pool itself could internally track the ses  sions 
it gave out, since
   //       calling close on an unopened session is probably harmless.
@@ -231,6 +233,12 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
       metricsSystem = null;
     }
 
+    if (HiveConf.getBoolVar(conf, 
ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS)) {
+      externalSessions = new TezExternalSessionsRegistryClient(conf);
+    } else {
+      externalSessions = null;
+    }
+
     wmThread = new Thread(() -> runWmThread(), "Workload management master");
     wmThread.setDaemon(true);
     wmThread.start();
@@ -280,9 +288,9 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   public void stop() throws Exception {
     List<TezSessionPoolSession> sessionsToClose = null;
     synchronized (openSessions) {
-      sessionsToClose = new 
ArrayList<TezSessionPoolSession>(openSessions.keySet());
+      sessionsToClose = new ArrayList<>(openSessions.keySet());
     }
-    for (TezSessionState sessionState : sessionsToClose) {
+    for (TezSessionPoolSession sessionState : sessionsToClose) {
       sessionState.close(false);
     }
     if (expirationTracker != null) {
@@ -309,7 +317,7 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
       String poolName = entry.getKey();
       PoolState poolState = entry.getValue();
       final List<Trigger> triggers = 
Collections.unmodifiableList(poolState.getTriggers());
-      final List<TezSessionState> sessionStates = 
Collections.unmodifiableList(poolState.getSessions());
+      final List<TezSession> sessionStates = 
Collections.unmodifiableList(poolState.getSessions());
       SessionTriggerProvider sessionTriggerProvider = 
perPoolProviders.get(poolName);
       if (sessionTriggerProvider != null) {
         perPoolProviders.get(poolName).setTriggers(triggers);
@@ -431,24 +439,23 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
         //       because we know which exact query we intend to kill. This is 
valid because we
         //       are not expecting query ID to change - we never reuse the 
session for which a
         //       query is being killed until both the kill, and the user, 
return it.
-        String queryId = toKill.getQueryId();
-        KillQuery kq = toKill.getKillQuery();
         try {
-          if (kq != null && queryId != null) {
-            WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL);
-            LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
-            try {
-              kq.killQuery(queryId, reason, toKill.getConf());
-              addKillQueryResult(toKill, true);
-              killCtx.killSessionFuture.set(true);
-              wmEvent.endEvent(toKill);
-              LOG.debug("Killed " + queryId);
-              return;
-            } catch (HiveException ex) {
-              LOG.error("Failed to kill " + queryId + "; will try to restart 
AM instead" , ex);
-            }
+          boolean wasKilled = false;
+          String queryId = toKill.getQueryId();
+          WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL);
+          try {
+            wasKilled = toKill.killQuery(reason);
+            addKillQueryResult(toKill, true);
+            killCtx.killSessionFuture.set(true);
+            wmEvent.endEvent(toKill);
+            
+          } catch (HiveException ex) {
+            LOG.error("Failed to kill " + queryId + "; will try to restart AM 
instead" , ex);
+          }
+          if (wasKilled) {
+            LOG.debug("Killed " + queryId);
           } else {
-            LOG.info("Will queue restart for {}; queryId {}, killQuery {}", 
toKill, queryId, kq);
+            LOG.info("Will queue restart for {}; queryId {}", toKill, queryId);
           }
         } finally {
           toKill.setQueryId(null);
@@ -580,7 +587,8 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
         if (!wasReturned) {
           syncWork.toDestroyNoRestart.add(sessionToReturn);
         } else {
-          if (sessionToReturn.getWmContext() != null && 
sessionToReturn.getWmContext().isQueryCompleted()) {
+          WmContext ctx = sessionToReturn.getWmContext();
+          if (ctx != null && ctx.isQueryCompleted()) {
             sessionToReturn.resolveReturnFuture();
           }
           wmEvent.endEvent(sessionToReturn);
@@ -665,7 +673,8 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
         if (!tezAmPool.returnSessionAsync(ctx.session)) {
           syncWork.toDestroyNoRestart.add(ctx.session);
         } else {
-          if (ctx.session.getWmContext() != null && 
ctx.session.getWmContext().isQueryCompleted()) {
+          WmContext wmCtx = ctx.session.getWmContext();
+          if (wmCtx != null && wmCtx.isQueryCompleted()) {
             ctx.session.resolveReturnFuture();
           }
           wmEvent.endEvent(ctx.session);
@@ -1241,7 +1250,8 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     if (!tezAmPool.returnSessionAsync(session)) {
       syncWork.toDestroyNoRestart.add(session);
     } else {
-      if (session.getWmContext() != null && 
session.getWmContext().isQueryCompleted()) {
+      WmContext wmCtx = session.getWmContext();
+      if (wmCtx != null && wmCtx.isQueryCompleted()) {
         session.resolveReturnFuture();
       }
       wmEvent.endEvent(session);
@@ -1407,11 +1417,11 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
 
   @VisibleForTesting
   public WmTezSession getSession(
-      TezSessionState session, MappingInput input, HiveConf conf) throws 
Exception {
+      TezSession session, MappingInput input, HiveConf conf) throws Exception {
     return getSession(session, input, conf, null);
   }
 
-  public WmTezSession getSession(TezSessionState session, MappingInput input, 
HiveConf conf,
+  public WmTezSession getSession(TezSession session, MappingInput input, 
HiveConf conf,
       final WmContext wmContext) throws Exception {
     WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET);
     // Note: not actually used for pool sessions; verify some things like doAs 
are not set.
@@ -1443,7 +1453,7 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   }
 
   @Override
-  public void destroy(TezSessionState session) throws Exception {
+  public void destroy(TezSession session) throws Exception {
     WmTezSession wmTezSession = ensureOwnedSession(session);
     resetGlobalTezSession(wmTezSession);
     currentLock.lock();
@@ -1577,7 +1587,7 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
 
 
   @Override
-  public TezSessionState reopen(TezSessionState session) throws Exception {
+  public TezSession reopen(TezSession session) throws Exception {
     WmTezSession wmTezSession = ensureOwnedSession(session);
     HiveConf sessionConf = wmTezSession.getConf();
     if (sessionConf == null) {
@@ -1615,7 +1625,7 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     hasChangesCondition.signalAll();
   }
 
-  private WmTezSession checkSessionForReuse(TezSessionState session) throws 
Exception {
+  private WmTezSession checkSessionForReuse(TezSession session) throws 
Exception {
     if (session == null) return null;
     WmTezSession result = null;
     if (session instanceof WmTezSession) {
@@ -1664,10 +1674,16 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
     conf = (conf == null) ? new HiveConf(this.conf) : conf;
     conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, "true");
-    return new WmTezSession(sessionId, this, expirationTracker, conf);
+    TezSessionState base = null;
+    if (externalSessions != null) {
+      base = new TezExternalSessionState(sessionId, conf, externalSessions);
+    } else {
+      base = new TezSessionState(sessionId, conf);
+    }
+    return new WmTezSession(this, expirationTracker, base);
   }
 
-  private WmTezSession ensureOwnedSession(TezSessionState oldSession) {
+  private WmTezSession ensureOwnedSession(TezSession oldSession) {
     if (!(oldSession instanceof WmTezSession) || 
!((WmTezSession)oldSession).isOwnedBy(this)) {
       throw new AssertionError("Not a WM session " + oldSession);
     }
@@ -1689,7 +1705,7 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     synchronized (openSessions) {
       openSessions.remove(session);
     }
-    tezAmPool.notifyClosed(session);
+    tezAmPool.notifyClosed(ensureOwnedSession(session));
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
index 4b5022a..91ec1bf 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
 public class WorkloadManagerFederation {
   private static final Logger LOG = 
LoggerFactory.getLogger(WorkloadManagerFederation.class);
 
-  public static TezSessionState getSession(TezSessionState session, HiveConf 
conf,
+  public static TezSession getSession(TezSession session, HiveConf conf,
       MappingInput input, boolean isUnmanagedLlapMode, WmContext wmContext) 
throws Exception {
     Set<String> desiredCounters = new HashSet<>();
     // 1. If WM is not present just go to unmanaged.
@@ -59,8 +59,8 @@ public class WorkloadManagerFederation {
     }
   }
 
-  private static TezSessionState getUnmanagedSession(
-    TezSessionState session, HiveConf conf, Set<String> desiredCounters, 
boolean isWorkLlapNode,
+  private static TezSession getUnmanagedSession(
+    TezSession session, HiveConf conf, Set<String> desiredCounters, boolean 
isWorkLlapNode,
     final WmContext wmContext) throws Exception {
     TezSessionPoolManager pm = TezSessionPoolManager.getInstance();
     session = pm.getSession(session, conf, false, isWorkLlapNode);

http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
index 01dc7e2..bbbd457 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
@@ -22,5 +22,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 
 public interface KillQuery {
-  void killQuery(String queryId, String errMsg, HiveConf conf) throws 
HiveException;
+  void killQuery(String queryId, String errMsg, HiveConf conf, boolean isYarn)
+      throws HiveException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
index eac2936..78598e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
@@ -23,7 +23,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 
 public class NullKillQuery implements KillQuery {
   @Override
-  public void killQuery(String queryId, String errMsg, HiveConf conf) throws 
HiveException {
+  public void killQuery(String queryId, String errMsg, HiveConf conf, boolean 
isYarn)
+      throws HiveException {
     // Do nothing
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 71e130b..66e0ea9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.exec.tez.TezSession;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.history.HiveHistoryImpl;
@@ -230,7 +231,7 @@ public class SessionState {
 
   private Map<String, List<String>> localMapRedErrors;
 
-  private TezSessionState tezSessionState;
+  private TezSession tezSessionState;
 
   private String currentDatabase;
 
@@ -1877,23 +1878,23 @@ public class SessionState {
     }
   }
 
-  public TezSessionState getTezSession() {
+  public TezSession getTezSession() {
     return tezSessionState;
   }
 
   /** Called from TezTask to attach a TezSession to use to the threadlocal. 
Ugly pattern... */
-  public void setTezSession(TezSessionState session) {
+  public void setTezSession(TezSession session) {
     if (tezSessionState == session) {
       return; // The same object.
     }
     if (tezSessionState != null) {
-      tezSessionState.markFree();
+      tezSessionState.unsetOwnerThread();
       tezSessionState.setKillQuery(null);
       tezSessionState = null;
     }
     tezSessionState = session;
     if (session != null) {
-      session.markInUse();
+      session.setOwnerThread();
       tezSessionState.setKillQuery(getKillQuery());
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java 
b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java
index 16106f4..677c741 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java
@@ -17,21 +17,22 @@ package org.apache.hadoop.hive.ql.wm;
 
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.exec.tez.TezSession;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
 
 /**
  * Implementation for providing current open sessions and active trigger.
  */
 public class SessionTriggerProvider {
-  private List<TezSessionState> sessions;
+  private List<TezSession> sessions;
   private List<Trigger> triggers;
 
-  public SessionTriggerProvider(List<TezSessionState> openSessions, 
List<Trigger> triggers) {
+  public SessionTriggerProvider(List<TezSession> openSessions, List<Trigger> 
triggers) {
     this.sessions = openSessions;
     this.triggers = triggers;
   }
 
-  public List<TezSessionState> getSessions() {
+  public List<TezSession> getSessions() {
     return sessions;
   }
 
@@ -39,7 +40,7 @@ public class SessionTriggerProvider {
     return triggers;
   }
 
-  public void setSessions(final List<TezSessionState> sessions) {
+  public void setSessions(final List<TezSession> sessions) {
     this.sessions = sessions;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
index f5ab981..b856e10 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
@@ -23,9 +23,13 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.concurrent.ScheduledExecutorService;
+
 import javax.security.auth.login.LoginException;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -48,8 +52,9 @@ public class SampleTezSessionState extends WmTezSession {
 
   public SampleTezSessionState(
       String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) {
-    super(sessionId, parent, (parent instanceof TezSessionPoolManager)
-        ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf);
+    super(parent, (parent instanceof TezSessionPoolManager)
+        ? ((TezSessionPoolManager)parent).getExpirationTracker() : null,
+            new TezSessionState(sessionId, conf));
     this.sessionId = sessionId;
     this.hiveConf = conf;
     waitForAmRegFuture = createDefaultWaitForAmRegistryFuture();
@@ -79,6 +84,12 @@ public class SampleTezSessionState extends WmTezSession {
   }
 
   @Override
+  public void open(boolean isPoolInit) throws IOException, LoginException,
+      URISyntaxException, TezException {
+    open();
+  }
+
+  @Override
   public void open(HiveResources resources) throws LoginException, IOException 
{
     open();
   }
@@ -89,7 +100,7 @@ public class SampleTezSessionState extends WmTezSession {
   }
 
   @Override
-  void close(boolean keepTmpDir) throws TezException, IOException {
+  public void close(boolean keepTmpDir) throws TezException, IOException {
     open = keepTmpDir;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index d5b683f..68b9c3e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -67,13 +67,13 @@ public class TestTezSessionPool {
   public void testGetNonDefaultSession() {
     poolManager = new TestTezSessionPoolManager();
     try {
-      TezSessionState sessionState = poolManager.getSession(null, conf, true, 
false);
-      TezSessionState sessionState1 = poolManager.getSession(sessionState, 
conf, true, false);
+      TezSession sessionState = poolManager.getSession(null, conf, true, 
false);
+      TezSession sessionState1 = poolManager.getSession(sessionState, conf, 
true, false);
       if (sessionState1 != sessionState) {
         fail();
       }
       conf.set("tez.queue.name", "nondefault");
-      TezSessionState sessionState2 = poolManager.getSession(sessionState, 
conf, true, false);
+      TezSession sessionState2 = poolManager.getSession(sessionState, conf, 
true, false);
       if (sessionState2 == sessionState) {
         fail();
       }
@@ -97,7 +97,7 @@ public class TestTezSessionPool {
       // this is now a LIFO operation
 
       // draw 1 and replace
-      TezSessionState sessionState = poolManager.getSession(null, conf, true, 
false);
+      TezSession sessionState = poolManager.getSession(null, conf, true, 
false);
       assertEquals("a", sessionState.getQueueName());
       poolManager.returnSession(sessionState);
 
@@ -108,13 +108,13 @@ public class TestTezSessionPool {
       // [a,b,c,a,b,c]
 
       // draw 2 and return in order - further run should return last returned
-      TezSessionState first = poolManager.getSession(null, conf, true, false);
-      TezSessionState second = poolManager.getSession(null, conf, true, false);
+      TezSession first = poolManager.getSession(null, conf, true, false);
+      TezSession second = poolManager.getSession(null, conf, true, false);
       assertEquals("a", first.getQueueName());
       assertEquals("b", second.getQueueName());
       poolManager.returnSession(first);
       poolManager.returnSession(second);
-      TezSessionState third = poolManager.getSession(null, conf, true, false);
+      TezSession third = poolManager.getSession(null, conf, true, false);
       assertEquals("b", third.getQueueName());
       poolManager.returnSession(third);
 
@@ -157,7 +157,7 @@ public class TestTezSessionPool {
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
       poolManager.startPool(conf, null);
-      TezSessionState[] sessions = new TezSessionState[12];
+      TezSession[] sessions = new TezSession[12];
       int[] queueCounts = new int[3];
       for (int i = 0; i < sessions.length; ++i) {
         sessions[i] = poolManager.getSession(null, conf, true, false);
@@ -184,7 +184,7 @@ public class TestTezSessionPool {
       conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 1);
 
       poolManager = new TestTezSessionPoolManager();
-      TezSessionState session = Mockito.mock(TezSessionState.class);
+      TezSession session = Mockito.mock(TezSession.class);
       Mockito.when(session.getQueueName()).thenReturn("default");
       Mockito.when(session.isDefault()).thenReturn(false);
       Mockito.when(session.getConf()).thenReturn(conf);
@@ -192,7 +192,7 @@ public class TestTezSessionPool {
       poolManager.reopen(session);
 
       Mockito.verify(session).close(false);
-      
Mockito.verify(session).open(Mockito.<TezSessionState.HiveResources>any());
+      Mockito.verify(session).open(Mockito.<TezSession.HiveResources>any());
 
       // mocked session starts with default queue
       assertEquals("default", session.getQueueName());
@@ -278,7 +278,7 @@ public class TestTezSessionPool {
           tmpConf.set("tez.queue.name", "");
         }
 
-        TezSessionState session = poolManager.getSession(null, tmpConf, true, 
llap);
+        TezSession session = poolManager.getSession(null, tmpConf, true, llap);
         Thread.sleep((random.nextInt(9) % 10) * 1000);
         session.setLegacyLlapMode(llap);
         poolManager.returnSession(session);
@@ -323,20 +323,20 @@ public class TestTezSessionPool {
   @Test
   public void testCloseAndOpenDefault() throws Exception {
     poolManager = new TestTezSessionPoolManager();
-    TezSessionState session = Mockito.mock(TezSessionState.class);
+    TezSession session = Mockito.mock(TezSession.class);
     Mockito.when(session.isDefault()).thenReturn(false);
     Mockito.when(session.getConf()).thenReturn(conf);
 
     poolManager.reopen(session);
 
     Mockito.verify(session).close(false);
-    Mockito.verify(session).open(Mockito.<TezSessionState.HiveResources>any());
+    Mockito.verify(session).open(Mockito.<TezSession.HiveResources>any());
   }
 
   @Test
   public void testSessionDestroy() throws Exception {
     poolManager = new TestTezSessionPoolManager();
-    TezSessionState session = Mockito.mock(TezSessionState.class);
+    TezSession session = Mockito.mock(TezSession.class);
     Mockito.when(session.isDefault()).thenReturn(false);
 
     poolManager.destroy(session);

http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index b67aec3..a8eb3b1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -172,7 +172,7 @@ public class TestTezTask {
     SessionState.start(hiveConf);
     session = mock(TezClient.class);
     sessionState = mock(TezSessionState.class);
-    when(sessionState.getSession()).thenReturn(session);
+    when(sessionState.getTezClient()).thenReturn(session);
     when(sessionState.reopen()).thenReturn(sessionState);
     when(session.submitDAG(any(DAG.class)))
       .thenThrow(new SessionNotRunning(""))

http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 4659ecb..45366ec 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory;
 
 @RunWith(RetryTestRunner.class)
 public class TestWorkloadManager {
-  @SuppressWarnings("unused")
   private static final Logger LOG = 
LoggerFactory.getLogger(TestWorkloadManager.class);
 
   private final class GetSessionRunnable implements Runnable {
@@ -227,7 +226,7 @@ public class TestWorkloadManager {
 
     @Override
     public WmTezSession getSession(
-      TezSessionState session, MappingInput input, HiveConf conf,
+      TezSession session, MappingInput input, HiveConf conf,
       final WmContext wmContext) throws Exception {
       // We want to wait for the iteration to finish and set the cluster 
fraction.
       WmTezSession state = super.getSession(session, input, conf, null);
@@ -236,7 +235,7 @@ public class TestWorkloadManager {
     }
 
     @Override
-    public void destroy(TezSessionState session) throws Exception {
+    public void destroy(TezSession session) throws Exception {
       super.destroy(session);
       ensureWm();
     }
@@ -252,7 +251,7 @@ public class TestWorkloadManager {
     }
 
     @Override
-    public TezSessionState reopen(TezSessionState session) throws Exception {
+    public TezSession reopen(TezSession session) throws Exception {
       session = super.reopen(session);
       ensureWm();
       return session;
@@ -269,10 +268,10 @@ public class TestWorkloadManager {
     MockQam qam = new MockQam();
     WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
     wm.start();
-    TezSessionState nonPool = mock(TezSessionState.class);
+    TezSession nonPool = mock(TezSession.class);
     when(nonPool.getConf()).thenReturn(conf);
     doNothing().when(nonPool).close(anyBoolean());
-    TezSessionState session = wm.getSession(nonPool, mappingInput("user"), 
conf);
+    TezSession session = wm.getSession(nonPool, mappingInput("user"), conf);
     verify(nonPool).close(anyBoolean());
     assertNotSame(nonPool, session);
     session.returnToSessionManager();
@@ -282,7 +281,7 @@ public class TestWorkloadManager {
     session = wm.getSession(diffPool, mappingInput("user"), conf);
     verify(diffPool).returnToSessionManager();
     assertNotSame(diffPool, session);
-    TezSessionState session2 = wm.getSession(session, mappingInput("user"), 
conf);
+    TezSession session2 = wm.getSession(session, mappingInput("user"), conf);
     assertSame(session, session2);
   }
 
@@ -294,7 +293,7 @@ public class TestWorkloadManager {
     wm.start();
     // The queue should be ignored.
     conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2");
-    TezSessionState session = wm.getSession(null, mappingInput("user"), conf);
+    TezSession session = wm.getSession(null, mappingInput("user"), conf);
     assertEquals("test", session.getQueueName());
     assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
     session.setQueueName("test2");
@@ -415,7 +414,7 @@ public class TestWorkloadManager {
     verifyMapping(wm, conf, mappingInput("zzz", groups("g0", "g1"), "g1"), 
"g1");
     // Explicit pool specification - invalid - there's no mapping that matches.
     try {
-      TezSessionState r = wm.getSession(
+      TezSession r = wm.getSession(
         null, mappingInput("u0", groups("g0", "g1"), "u2"), conf);
       fail("Expected failure, but got " + r);
     } catch (Exception ex) {
@@ -428,7 +427,7 @@ public class TestWorkloadManager {
     verifyMapping(wm, conf, mappingInput("u0", groups("g0", "g1"), "u2"), 
"u2");
     // The mapping that doesn't exist still shouldn't work.
     try {
-      TezSessionState r = wm.getSession(
+      TezSession r = wm.getSession(
         null, mappingInput("u0", groups("g0", "g1"), "zzz"), conf);
       fail("Expected failure, but got " + r);
     } catch (Exception ex) {
@@ -824,7 +823,7 @@ public class TestWorkloadManager {
     assertEquals(0, tezAmPool.getCurrentSize());
 
     try {
-      TezSessionState r =  wm.getSession(null, mappingInput("A", null), conf, 
null);
+      TezSession r =  wm.getSession(null, mappingInput("A", null), conf, null);
       fail("Expected an error but got " + r);
     } catch (WorkloadManager.NoPoolMappingException ex) {
       // Ignore, this particular error is expected.
@@ -1165,7 +1164,7 @@ public class TestWorkloadManager {
     SettableFuture<Boolean> failedWait = SettableFuture.create();
     failedWait.setException(new Exception("foo"));
     theOnlySession.setWaitForAmRegistryFuture(failedWait);
-    TezSessionState retriedSession = wm.getSession(null, mappingInput("A"), 
conf);
+    TezSession retriedSession = wm.getSession(null, mappingInput("A"), conf);
     assertNotNull(retriedSession);
     assertNotSame(theOnlySession, retriedSession); // Should have been 
replaced.
     retriedSession.returnToSessionManager();
@@ -1175,7 +1174,7 @@ public class TestWorkloadManager {
     theOnlySession.setWaitForAmRegistryFuture(failedWait);
     wm.setNextWaitForAmRegistryFuture(failedWait); // Fail the retry.
     try {
-      TezSessionState r = wm.getSession(null, mappingInput("A"), conf);
+      TezSession r = wm.getSession(null, mappingInput("A"), conf);
       fail("Expected an error but got " + r);
     } catch (Exception ex) {
       // Expected.

http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java 
b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
index 490a04d..2c23253 100644
--- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -96,7 +96,8 @@ public class KillQueryImpl implements KillQuery {
   }
 
   @Override
-  public void killQuery(String queryId, String errMsg, HiveConf conf) throws 
HiveException {
+  public void killQuery(
+      String queryId, String errMsg, HiveConf conf, boolean isYarn) throws 
HiveException {
     try {
       String queryTag = null;
 
@@ -120,8 +121,10 @@ public class KillQueryImpl implements KillQuery {
         queryTag = queryId;
       }
 
-      LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" 
+ queryTag);
-      killChildYarnJobs(conf, queryTag);
+      if (isYarn) {
+        LOG.info("Killing yarn jobs for query id : " + queryId + " using tag 
:" + queryTag);
+        killChildYarnJobs(conf, queryTag);
+      }
 
       if (operation != null) {
         OperationHandle handle = operation.getHandle();

Reply via email to