Repository: hive Updated Branches: refs/heads/master-tez092 [created] 8151911b4
http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 5326e35..f575975 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -63,7 +63,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.api.WMPoolTrigger; import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources; +import org.apache.hadoop.hive.ql.exec.tez.TezSession.HiveResources; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; import org.apache.hadoop.hive.ql.exec.tez.WmEvent.EventType; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -96,7 +96,7 @@ import org.slf4j.LoggerFactory; * none of that state can be accessed directly - most changes that touch pool state, or interact * with background operations like init, need to go thru eventstate; see e.g. returnAfterUse. */ -public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator +public class WorkloadManager extends AbstractTriggerValidator implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl, WorkloadManagerMxBean { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class); @@ -114,6 +114,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida private final int amRegistryTimeoutMs; private final boolean allowAnyPool; private final MetricsSystem metricsSystem; + private final TezExternalSessionsRegistryClient externalSessions; + // Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool // sessions, so the pool itself could internally track the ses sions it gave out, since // calling close on an unopened session is probably harmless. @@ -231,6 +233,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida metricsSystem = null; } + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS)) { + externalSessions = new TezExternalSessionsRegistryClient(conf); + } else { + externalSessions = null; + } + wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); wmThread.start(); @@ -280,9 +288,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida public void stop() throws Exception { List<TezSessionPoolSession> sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList<TezSessionPoolSession>(openSessions.keySet()); + sessionsToClose = new ArrayList<>(openSessions.keySet()); } - for (TezSessionState sessionState : sessionsToClose) { + for (TezSessionPoolSession sessionState : sessionsToClose) { sessionState.close(false); } if (expirationTracker != null) { @@ -309,7 +317,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida String poolName = entry.getKey(); PoolState poolState = entry.getValue(); final List<Trigger> triggers = Collections.unmodifiableList(poolState.getTriggers()); - final List<TezSessionState> sessionStates = Collections.unmodifiableList(poolState.getSessions()); + final List<TezSession> sessionStates = Collections.unmodifiableList(poolState.getSessions()); SessionTriggerProvider sessionTriggerProvider = perPoolProviders.get(poolName); if (sessionTriggerProvider != null) { perPoolProviders.get(poolName).setTriggers(triggers); @@ -431,24 +439,23 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // because we know which exact query we intend to kill. This is valid because we // are not expecting query ID to change - we never reuse the session for which a // query is being killed until both the kill, and the user, return it. - String queryId = toKill.getQueryId(); - KillQuery kq = toKill.getKillQuery(); try { - if (kq != null && queryId != null) { - WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL); - LOG.info("Invoking KillQuery for " + queryId + ": " + reason); - try { - kq.killQuery(queryId, reason, toKill.getConf()); - addKillQueryResult(toKill, true); - killCtx.killSessionFuture.set(true); - wmEvent.endEvent(toKill); - LOG.debug("Killed " + queryId); - return; - } catch (HiveException ex) { - LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex); - } + boolean wasKilled = false; + String queryId = toKill.getQueryId(); + WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL); + try { + wasKilled = toKill.killQuery(reason); + addKillQueryResult(toKill, true); + killCtx.killSessionFuture.set(true); + wmEvent.endEvent(toKill); + + } catch (HiveException ex) { + LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex); + } + if (wasKilled) { + LOG.debug("Killed " + queryId); } else { - LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq); + LOG.info("Will queue restart for {}; queryId {}", toKill, queryId); } } finally { toKill.setQueryId(null); @@ -580,7 +587,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida if (!wasReturned) { syncWork.toDestroyNoRestart.add(sessionToReturn); } else { - if (sessionToReturn.getWmContext() != null && sessionToReturn.getWmContext().isQueryCompleted()) { + WmContext ctx = sessionToReturn.getWmContext(); + if (ctx != null && ctx.isQueryCompleted()) { sessionToReturn.resolveReturnFuture(); } wmEvent.endEvent(sessionToReturn); @@ -665,7 +673,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida if (!tezAmPool.returnSessionAsync(ctx.session)) { syncWork.toDestroyNoRestart.add(ctx.session); } else { - if (ctx.session.getWmContext() != null && ctx.session.getWmContext().isQueryCompleted()) { + WmContext wmCtx = ctx.session.getWmContext(); + if (wmCtx != null && wmCtx.isQueryCompleted()) { ctx.session.resolveReturnFuture(); } wmEvent.endEvent(ctx.session); @@ -1241,7 +1250,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida if (!tezAmPool.returnSessionAsync(session)) { syncWork.toDestroyNoRestart.add(session); } else { - if (session.getWmContext() != null && session.getWmContext().isQueryCompleted()) { + WmContext wmCtx = session.getWmContext(); + if (wmCtx != null && wmCtx.isQueryCompleted()) { session.resolveReturnFuture(); } wmEvent.endEvent(session); @@ -1407,11 +1417,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida @VisibleForTesting public WmTezSession getSession( - TezSessionState session, MappingInput input, HiveConf conf) throws Exception { + TezSession session, MappingInput input, HiveConf conf) throws Exception { return getSession(session, input, conf, null); } - public WmTezSession getSession(TezSessionState session, MappingInput input, HiveConf conf, + public WmTezSession getSession(TezSession session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception { WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET); // Note: not actually used for pool sessions; verify some things like doAs are not set. @@ -1443,7 +1453,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } @Override - public void destroy(TezSessionState session) throws Exception { + public void destroy(TezSession session) throws Exception { WmTezSession wmTezSession = ensureOwnedSession(session); resetGlobalTezSession(wmTezSession); currentLock.lock(); @@ -1577,7 +1587,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida @Override - public TezSessionState reopen(TezSessionState session) throws Exception { + public TezSession reopen(TezSession session) throws Exception { WmTezSession wmTezSession = ensureOwnedSession(session); HiveConf sessionConf = wmTezSession.getConf(); if (sessionConf == null) { @@ -1615,7 +1625,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida hasChangesCondition.signalAll(); } - private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception { + private WmTezSession checkSessionForReuse(TezSession session) throws Exception { if (session == null) return null; WmTezSession result = null; if (session instanceof WmTezSession) { @@ -1664,10 +1674,16 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida protected WmTezSession createSessionObject(String sessionId, HiveConf conf) { conf = (conf == null) ? new HiveConf(this.conf) : conf; conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, "true"); - return new WmTezSession(sessionId, this, expirationTracker, conf); + TezSessionState base = null; + if (externalSessions != null) { + base = new TezExternalSessionState(sessionId, conf, externalSessions); + } else { + base = new TezSessionState(sessionId, conf); + } + return new WmTezSession(this, expirationTracker, base); } - private WmTezSession ensureOwnedSession(TezSessionState oldSession) { + private WmTezSession ensureOwnedSession(TezSession oldSession) { if (!(oldSession instanceof WmTezSession) || !((WmTezSession)oldSession).isOwnedBy(this)) { throw new AssertionError("Not a WM session " + oldSession); } @@ -1689,7 +1705,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida synchronized (openSessions) { openSessions.remove(session); } - tezAmPool.notifyClosed(session); + tezAmPool.notifyClosed(ensureOwnedSession(session)); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java index 4b5022a..91ec1bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory; public class WorkloadManagerFederation { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManagerFederation.class); - public static TezSessionState getSession(TezSessionState session, HiveConf conf, + public static TezSession getSession(TezSession session, HiveConf conf, MappingInput input, boolean isUnmanagedLlapMode, WmContext wmContext) throws Exception { Set<String> desiredCounters = new HashSet<>(); // 1. If WM is not present just go to unmanaged. @@ -59,8 +59,8 @@ public class WorkloadManagerFederation { } } - private static TezSessionState getUnmanagedSession( - TezSessionState session, HiveConf conf, Set<String> desiredCounters, boolean isWorkLlapNode, + private static TezSession getUnmanagedSession( + TezSession session, HiveConf conf, Set<String> desiredCounters, boolean isWorkLlapNode, final WmContext wmContext) throws Exception { TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); session = pm.getSession(session, conf, false, isWorkLlapNode); http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java index 01dc7e2..bbbd457 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java @@ -22,5 +22,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; public interface KillQuery { - void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException; + void killQuery(String queryId, String errMsg, HiveConf conf, boolean isYarn) + throws HiveException; } http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java index eac2936..78598e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java @@ -23,7 +23,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; public class NullKillQuery implements KillQuery { @Override - public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException { + public void killQuery(String queryId, String errMsg, HiveConf conf, boolean isYarn) + throws HiveException { // Do nothing } } http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 71e130b..66e0ea9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.history.HiveHistoryImpl; @@ -230,7 +231,7 @@ public class SessionState { private Map<String, List<String>> localMapRedErrors; - private TezSessionState tezSessionState; + private TezSession tezSessionState; private String currentDatabase; @@ -1877,23 +1878,23 @@ public class SessionState { } } - public TezSessionState getTezSession() { + public TezSession getTezSession() { return tezSessionState; } /** Called from TezTask to attach a TezSession to use to the threadlocal. Ugly pattern... */ - public void setTezSession(TezSessionState session) { + public void setTezSession(TezSession session) { if (tezSessionState == session) { return; // The same object. } if (tezSessionState != null) { - tezSessionState.markFree(); + tezSessionState.unsetOwnerThread(); tezSessionState.setKillQuery(null); tezSessionState = null; } tezSessionState = session; if (session != null) { - session.markInUse(); + session.setOwnerThread(); tezSessionState.setKillQuery(getKillQuery()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java index 16106f4..677c741 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java @@ -17,21 +17,22 @@ package org.apache.hadoop.hive.ql.wm; import java.util.List; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; /** * Implementation for providing current open sessions and active trigger. */ public class SessionTriggerProvider { - private List<TezSessionState> sessions; + private List<TezSession> sessions; private List<Trigger> triggers; - public SessionTriggerProvider(List<TezSessionState> openSessions, List<Trigger> triggers) { + public SessionTriggerProvider(List<TezSession> openSessions, List<Trigger> triggers) { this.sessions = openSessions; this.triggers = triggers; } - public List<TezSessionState> getSessions() { + public List<TezSession> getSessions() { return sessions; } @@ -39,7 +40,7 @@ public class SessionTriggerProvider { return triggers; } - public void setSessions(final List<TezSessionState> sessions) { + public void setSessions(final List<TezSession> sessions) { this.sessions = sessions; } http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java index f5ab981..b856e10 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java @@ -23,9 +23,13 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; + import java.io.IOException; +import java.net.URISyntaxException; import java.util.concurrent.ScheduledExecutorService; + import javax.security.auth.login.LoginException; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -48,8 +52,9 @@ public class SampleTezSessionState extends WmTezSession { public SampleTezSessionState( String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) { - super(sessionId, parent, (parent instanceof TezSessionPoolManager) - ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf); + super(parent, (parent instanceof TezSessionPoolManager) + ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, + new TezSessionState(sessionId, conf)); this.sessionId = sessionId; this.hiveConf = conf; waitForAmRegFuture = createDefaultWaitForAmRegistryFuture(); @@ -79,6 +84,12 @@ public class SampleTezSessionState extends WmTezSession { } @Override + public void open(boolean isPoolInit) throws IOException, LoginException, + URISyntaxException, TezException { + open(); + } + + @Override public void open(HiveResources resources) throws LoginException, IOException { open(); } @@ -89,7 +100,7 @@ public class SampleTezSessionState extends WmTezSession { } @Override - void close(boolean keepTmpDir) throws TezException, IOException { + public void close(boolean keepTmpDir) throws TezException, IOException { open = keepTmpDir; } http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index d5b683f..68b9c3e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -67,13 +67,13 @@ public class TestTezSessionPool { public void testGetNonDefaultSession() { poolManager = new TestTezSessionPoolManager(); try { - TezSessionState sessionState = poolManager.getSession(null, conf, true, false); - TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true, false); + TezSession sessionState = poolManager.getSession(null, conf, true, false); + TezSession sessionState1 = poolManager.getSession(sessionState, conf, true, false); if (sessionState1 != sessionState) { fail(); } conf.set("tez.queue.name", "nondefault"); - TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true, false); + TezSession sessionState2 = poolManager.getSession(sessionState, conf, true, false); if (sessionState2 == sessionState) { fail(); } @@ -97,7 +97,7 @@ public class TestTezSessionPool { // this is now a LIFO operation // draw 1 and replace - TezSessionState sessionState = poolManager.getSession(null, conf, true, false); + TezSession sessionState = poolManager.getSession(null, conf, true, false); assertEquals("a", sessionState.getQueueName()); poolManager.returnSession(sessionState); @@ -108,13 +108,13 @@ public class TestTezSessionPool { // [a,b,c,a,b,c] // draw 2 and return in order - further run should return last returned - TezSessionState first = poolManager.getSession(null, conf, true, false); - TezSessionState second = poolManager.getSession(null, conf, true, false); + TezSession first = poolManager.getSession(null, conf, true, false); + TezSession second = poolManager.getSession(null, conf, true, false); assertEquals("a", first.getQueueName()); assertEquals("b", second.getQueueName()); poolManager.returnSession(first); poolManager.returnSession(second); - TezSessionState third = poolManager.getSession(null, conf, true, false); + TezSession third = poolManager.getSession(null, conf, true, false); assertEquals("b", third.getQueueName()); poolManager.returnSession(third); @@ -157,7 +157,7 @@ public class TestTezSessionPool { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); poolManager.startPool(conf, null); - TezSessionState[] sessions = new TezSessionState[12]; + TezSession[] sessions = new TezSession[12]; int[] queueCounts = new int[3]; for (int i = 0; i < sessions.length; ++i) { sessions[i] = poolManager.getSession(null, conf, true, false); @@ -184,7 +184,7 @@ public class TestTezSessionPool { conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 1); poolManager = new TestTezSessionPoolManager(); - TezSessionState session = Mockito.mock(TezSessionState.class); + TezSession session = Mockito.mock(TezSession.class); Mockito.when(session.getQueueName()).thenReturn("default"); Mockito.when(session.isDefault()).thenReturn(false); Mockito.when(session.getConf()).thenReturn(conf); @@ -192,7 +192,7 @@ public class TestTezSessionPool { poolManager.reopen(session); Mockito.verify(session).close(false); - Mockito.verify(session).open(Mockito.<TezSessionState.HiveResources>any()); + Mockito.verify(session).open(Mockito.<TezSession.HiveResources>any()); // mocked session starts with default queue assertEquals("default", session.getQueueName()); @@ -278,7 +278,7 @@ public class TestTezSessionPool { tmpConf.set("tez.queue.name", ""); } - TezSessionState session = poolManager.getSession(null, tmpConf, true, llap); + TezSession session = poolManager.getSession(null, tmpConf, true, llap); Thread.sleep((random.nextInt(9) % 10) * 1000); session.setLegacyLlapMode(llap); poolManager.returnSession(session); @@ -323,20 +323,20 @@ public class TestTezSessionPool { @Test public void testCloseAndOpenDefault() throws Exception { poolManager = new TestTezSessionPoolManager(); - TezSessionState session = Mockito.mock(TezSessionState.class); + TezSession session = Mockito.mock(TezSession.class); Mockito.when(session.isDefault()).thenReturn(false); Mockito.when(session.getConf()).thenReturn(conf); poolManager.reopen(session); Mockito.verify(session).close(false); - Mockito.verify(session).open(Mockito.<TezSessionState.HiveResources>any()); + Mockito.verify(session).open(Mockito.<TezSession.HiveResources>any()); } @Test public void testSessionDestroy() throws Exception { poolManager = new TestTezSessionPoolManager(); - TezSessionState session = Mockito.mock(TezSessionState.class); + TezSession session = Mockito.mock(TezSession.class); Mockito.when(session.isDefault()).thenReturn(false); poolManager.destroy(session); http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index b67aec3..a8eb3b1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -172,7 +172,7 @@ public class TestTezTask { SessionState.start(hiveConf); session = mock(TezClient.class); sessionState = mock(TezSessionState.class); - when(sessionState.getSession()).thenReturn(session); + when(sessionState.getTezClient()).thenReturn(session); when(sessionState.reopen()).thenReturn(sessionState); when(session.submitDAG(any(DAG.class))) .thenThrow(new SessionNotRunning("")) http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 4659ecb..45366ec 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory; @RunWith(RetryTestRunner.class) public class TestWorkloadManager { - @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class); private final class GetSessionRunnable implements Runnable { @@ -227,7 +226,7 @@ public class TestWorkloadManager { @Override public WmTezSession getSession( - TezSessionState session, MappingInput input, HiveConf conf, + TezSession session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception { // We want to wait for the iteration to finish and set the cluster fraction. WmTezSession state = super.getSession(session, input, conf, null); @@ -236,7 +235,7 @@ public class TestWorkloadManager { } @Override - public void destroy(TezSessionState session) throws Exception { + public void destroy(TezSession session) throws Exception { super.destroy(session); ensureWm(); } @@ -252,7 +251,7 @@ public class TestWorkloadManager { } @Override - public TezSessionState reopen(TezSessionState session) throws Exception { + public TezSession reopen(TezSession session) throws Exception { session = super.reopen(session); ensureWm(); return session; @@ -269,10 +268,10 @@ public class TestWorkloadManager { MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); - TezSessionState nonPool = mock(TezSessionState.class); + TezSession nonPool = mock(TezSession.class); when(nonPool.getConf()).thenReturn(conf); doNothing().when(nonPool).close(anyBoolean()); - TezSessionState session = wm.getSession(nonPool, mappingInput("user"), conf); + TezSession session = wm.getSession(nonPool, mappingInput("user"), conf); verify(nonPool).close(anyBoolean()); assertNotSame(nonPool, session); session.returnToSessionManager(); @@ -282,7 +281,7 @@ public class TestWorkloadManager { session = wm.getSession(diffPool, mappingInput("user"), conf); verify(diffPool).returnToSessionManager(); assertNotSame(diffPool, session); - TezSessionState session2 = wm.getSession(session, mappingInput("user"), conf); + TezSession session2 = wm.getSession(session, mappingInput("user"), conf); assertSame(session, session2); } @@ -294,7 +293,7 @@ public class TestWorkloadManager { wm.start(); // The queue should be ignored. conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); - TezSessionState session = wm.getSession(null, mappingInput("user"), conf); + TezSession session = wm.getSession(null, mappingInput("user"), conf); assertEquals("test", session.getQueueName()); assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); session.setQueueName("test2"); @@ -415,7 +414,7 @@ public class TestWorkloadManager { verifyMapping(wm, conf, mappingInput("zzz", groups("g0", "g1"), "g1"), "g1"); // Explicit pool specification - invalid - there's no mapping that matches. try { - TezSessionState r = wm.getSession( + TezSession r = wm.getSession( null, mappingInput("u0", groups("g0", "g1"), "u2"), conf); fail("Expected failure, but got " + r); } catch (Exception ex) { @@ -428,7 +427,7 @@ public class TestWorkloadManager { verifyMapping(wm, conf, mappingInput("u0", groups("g0", "g1"), "u2"), "u2"); // The mapping that doesn't exist still shouldn't work. try { - TezSessionState r = wm.getSession( + TezSession r = wm.getSession( null, mappingInput("u0", groups("g0", "g1"), "zzz"), conf); fail("Expected failure, but got " + r); } catch (Exception ex) { @@ -824,7 +823,7 @@ public class TestWorkloadManager { assertEquals(0, tezAmPool.getCurrentSize()); try { - TezSessionState r = wm.getSession(null, mappingInput("A", null), conf, null); + TezSession r = wm.getSession(null, mappingInput("A", null), conf, null); fail("Expected an error but got " + r); } catch (WorkloadManager.NoPoolMappingException ex) { // Ignore, this particular error is expected. @@ -1165,7 +1164,7 @@ public class TestWorkloadManager { SettableFuture<Boolean> failedWait = SettableFuture.create(); failedWait.setException(new Exception("foo")); theOnlySession.setWaitForAmRegistryFuture(failedWait); - TezSessionState retriedSession = wm.getSession(null, mappingInput("A"), conf); + TezSession retriedSession = wm.getSession(null, mappingInput("A"), conf); assertNotNull(retriedSession); assertNotSame(theOnlySession, retriedSession); // Should have been replaced. retriedSession.returnToSessionManager(); @@ -1175,7 +1174,7 @@ public class TestWorkloadManager { theOnlySession.setWaitForAmRegistryFuture(failedWait); wm.setNextWaitForAmRegistryFuture(failedWait); // Fail the retry. try { - TezSessionState r = wm.getSession(null, mappingInput("A"), conf); + TezSession r = wm.getSession(null, mappingInput("A"), conf); fail("Expected an error but got " + r); } catch (Exception ex) { // Expected. http://git-wip-us.apache.org/repos/asf/hive/blob/e5448bf0/service/src/java/org/apache/hive/service/server/KillQueryImpl.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java index 490a04d..2c23253 100644 --- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -96,7 +96,8 @@ public class KillQueryImpl implements KillQuery { } @Override - public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException { + public void killQuery( + String queryId, String errMsg, HiveConf conf, boolean isYarn) throws HiveException { try { String queryTag = null; @@ -120,8 +121,10 @@ public class KillQueryImpl implements KillQuery { queryTag = queryId; } - LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag); - killChildYarnJobs(conf, queryTag); + if (isYarn) { + LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag); + killChildYarnJobs(conf, queryTag); + } if (operation != null) { OperationHandle handle = operation.getHandle();
