Repository: hive Updated Branches: refs/heads/master f3cb704a5 -> ac24537f2
http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 9f72155..538d745 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.registry.impl.TezAmInstance; -import com.google.common.annotations.VisibleForTesting; +import java.util.HashSet; import java.util.concurrent.Semaphore; import java.util.ArrayList; @@ -32,13 +32,16 @@ import java.util.Set; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.OpenSessionTracker; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; +import com.google.common.annotations.VisibleForTesting; + /** * This class is for managing multiple tez sessions particularly when * HiveServer2 is being used to submit queries. @@ -47,7 +50,7 @@ import org.apache.hadoop.security.UserGroupInformation; * on that queue and assigned to the session state. */ public class TezSessionPoolManager - implements SessionExpirationTracker.RestartImpl, OpenSessionTracker { + implements SessionExpirationTracker.RestartImpl, Manager { private enum CustomQueueAllowed { TRUE, @@ -64,7 +67,7 @@ public class TezSessionPoolManager private int numConcurrentLlapQueries = -1; private CustomQueueAllowed customQueueAllowed = CustomQueueAllowed.TRUE; - private TezSessionPool defaultSessionPool; + private TezSessionPool<TezSessionPoolSession> defaultSessionPool; private SessionExpirationTracker expirationTracker; private RestrictedConfigChecker restrictedConfig; @@ -114,9 +117,8 @@ public class TezSessionPoolManager int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames); if (numSessionsTotal > 0) { - // TODO: this can be enabled to test. Will only be used in WM case for now. boolean enableAmRegistry = false; - defaultSessionPool = new TezSessionPool(initConf, numSessionsTotal, enableAmRegistry); + defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, enableAmRegistry); } numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); @@ -249,12 +251,16 @@ public class TezSessionPoolManager return retTezSessionState; } - public void returnSession(TezSessionState tezSessionState, boolean llap) - throws Exception { + @Override + public void returnAfterUse(TezSessionPoolSession session) throws Exception { + returnSession(session); + } + + void returnSession(TezSessionState tezSessionState) throws Exception { // Ignore the interrupt status while returning the session, but set it back // on the thread in case anything else needs to deal with it. boolean isInterrupted = Thread.interrupted(); - + boolean llap = tezSessionState.getLegacyLlapMode(); try { if (isInterrupted) { LOG.info("returnSession invoked with interrupt status set"); @@ -262,6 +268,7 @@ public class TezSessionPoolManager if (llap && (this.numConcurrentLlapQueries > 0)) { llapQueue.release(); } + tezSessionState.setLegacyLlapMode(false); if (tezSessionState.isDefault() && tezSessionState instanceof TezSessionPoolSession) { LOG.info("The session " + tezSessionState.getSessionId() @@ -377,6 +384,7 @@ public class TezSessionPoolManager } if (canWorkWithSameSession(session, conf)) { + session.setLegacyLlapMode(llap); return session; } @@ -384,23 +392,33 @@ public class TezSessionPoolManager closeIfNotDefault(session, false); } - return getSession(conf, doOpen); + session = getSession(conf, doOpen); + session.setLegacyLlapMode(llap); + return session; } /** Reopens the session that was found to not be running. */ - public void reopenSession(TezSessionState sessionState, Configuration conf) throws Exception { + public TezSessionState reopenSession(TezSessionState sessionState, + Configuration conf, String[] additionalFiles) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionState.getQueueName() != null && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) { sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName()); } Set<String> oldAdditionalFiles = sessionState.getAdditionalFilesNotFromConf(); + if ((oldAdditionalFiles == null || oldAdditionalFiles.isEmpty()) + && (additionalFiles != null)) { + oldAdditionalFiles = new HashSet<>(); + for (String file : additionalFiles) { + oldAdditionalFiles.add(file); + } + } // TODO: close basically resets the object to a bunch of nulls. // We should ideally not reuse the object because it's pointless and error-prone. - // Close the old one, but keep the tmp files around. sessionState.close(true); // TODO: should we reuse scratchDir too? sessionState.open(oldAdditionalFiles, null); + return sessionState; } public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception { @@ -422,8 +440,8 @@ public class TezSessionPoolManager LOG.warn("Pool session has a null queue: " + oldSession); } TezSessionPoolSession newSession = createAndInitSession( - queueName, oldSession.isDefault(), oldSession.getConf()); - defaultSessionPool.replaceSession(oldSession, newSession); + queueName, oldSession.isDefault(), oldSession.getConf()); + defaultSessionPool.replaceSession(oldSession, newSession, false, null, null); } /** Called by TezSessionPoolSession when opened. */ @@ -449,4 +467,15 @@ public class TezSessionPoolManager public SessionExpirationTracker getExpirationTracker() { return expirationTracker; } + + @Override + public TezSessionPoolSession reopen( + TezSessionPoolSession session, Configuration conf, String[] inputOutputJars) { + return reopen(session, conf, inputOutputJars); + } + + @Override + public void destroy(TezSessionPoolSession session) throws Exception { + destroySession(session); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 8ecdbbf..4488c12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -18,6 +18,13 @@ package org.apache.hadoop.hive.ql.exec.tez; +import org.apache.hadoop.hive.registry.impl.TezAmInstance; + +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; + +import org.apache.hadoop.conf.Configuration; + import java.io.IOException; import java.net.URISyntaxException; import java.util.Collection; @@ -45,21 +52,26 @@ import com.google.common.annotations.VisibleForTesting; class TezSessionPoolSession extends TezSessionState { private static final int STATE_NONE = 0, STATE_IN_USE = 1, STATE_EXPIRED = 2; - interface OpenSessionTracker { + interface Manager { void registerOpenSession(TezSessionPoolSession session); void unregisterOpenSession(TezSessionPoolSession session); + void returnAfterUse(TezSessionPoolSession session) throws Exception; + TezSessionState reopen(TezSessionPoolSession session, Configuration conf, + String[] inputOutputJars) throws Exception; + void destroy(TezSessionPoolSession session) throws Exception; } private final AtomicInteger sessionState = new AtomicInteger(STATE_NONE); private Long expirationNs; - private final OpenSessionTracker parent; + private final Manager parent; private final SessionExpirationTracker expirationTracker; - public TezSessionPoolSession(String sessionId, OpenSessionTracker parent, - SessionExpirationTracker expirationTracker, HiveConf conf) { + + public TezSessionPoolSession(String sessionId, Manager parent, + SessionExpirationTracker tracker, HiveConf conf) { super(sessionId, conf); this.parent = parent; - this.expirationTracker = expirationTracker; + this.expirationTracker = tracker; } void setExpirationNs(long expirationNs) { @@ -71,7 +83,7 @@ class TezSessionPoolSession extends TezSessionState { } @Override - public void close(boolean keepTmpDir) throws Exception { + void close(boolean keepTmpDir) throws Exception { try { super.close(keepTmpDir); } finally { @@ -119,12 +131,7 @@ class TezSessionPoolSession extends TezSessionState { } } - /** - * Notifies the session that it's no longer in use. If the session has expired while in use, - * this method will take care of the expiration. - * @return True if the session was returned, false if it was restarted. - */ - public boolean returnAfterUse() throws Exception { + boolean stopUsing() throws Exception { int finalState = shouldExpire() ? STATE_EXPIRED : STATE_NONE; if (!sessionState.compareAndSet(STATE_IN_USE, finalState)) { throw new AssertionError("Unexpected state change; currently " + sessionState.get()); @@ -155,4 +162,28 @@ class TezSessionPoolSession extends TezSessionState { private boolean shouldExpire() { return expirationNs != null && (System.nanoTime() - expirationNs) >= 0; } + + @Override + public void returnToSessionManager() throws Exception { + parent.returnAfterUse(this); + } + + @Override + public TezSessionState reopen( + Configuration conf, String[] inputOutputJars) throws Exception { + return parent.reopen(this, conf, inputOutputJars); + } + + @Override + public void destroy() throws Exception { + parent.destroy(this); + } + + boolean isOwnedBy(Manager parent) { + return this.parent == parent; + } + + void updateFromRegistry(TezAmInstance si) { + // Nothing to do. + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 170de21..e5850f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -111,6 +111,7 @@ public class TezSessionState { private final Set<String> additionalFilesNotFromConf = new HashSet<String>(); private final Set<LocalResource> localizedResources = new HashSet<LocalResource>(); private boolean doAsEnabled; + private boolean isLegacyLlapMode; /** * Constructor. We do not automatically connect, because we only want to @@ -496,13 +497,14 @@ public class TezSessionState { /** * Close a tez session. Will cleanup any tez/am related resources. After closing a session no - * further DAGs can be executed against it. + * further DAGs can be executed against it. Only called by session management classes; some + * sessions should not simply be closed by users - e.g. pool sessions need to be restarted. * * @param keepTmpDir * whether or not to remove the scratch dir at the same time. * @throws Exception */ - public void close(boolean keepTmpDir) throws Exception { + void close(boolean keepTmpDir) throws Exception { if (session != null) { LOG.info("Closing Tez Session"); closeClient(session); @@ -726,4 +728,27 @@ public class TezSessionState { } while (!ownerThread.compareAndSet(null, newName)); } + void setLegacyLlapMode(boolean value) { + this.isLegacyLlapMode = value; + } + + boolean getLegacyLlapMode() { + return this.isLegacyLlapMode; + } + + public void returnToSessionManager() throws Exception { + // By default, TezSessionPoolManager handles this for both pool and non-pool session. + TezSessionPoolManager.getInstance().returnSession(this); + } + + public TezSessionState reopen( + Configuration conf, String[] inputOutputJars) throws Exception { + // By default, TezSessionPoolManager handles this for both pool and non-pool session. + return TezSessionPoolManager.getInstance().reopenSession(this, conf, inputOutputJars); + } + + public void destroy() throws Exception { + // By default, TezSessionPoolManager handles this for both pool and non-pool session. + TezSessionPoolManager.getInstance().destroySession(this); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index e6e236d..29d6fe6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -143,12 +143,20 @@ public class TezTask extends Task<TezWork> { // Need to remove this static hack. But this is the way currently to get a session. SessionState ss = SessionState.get(); + // Note: given that we return pool sessions to the pool in the finally block below, and that + // we need to set the global to null to do that, this "reuse" may be pointless. session = ss.getTezSession(); if (session != null && !session.isOpen()) { LOG.warn("The session: " + session + " has not been opened"); } - session = TezSessionPoolManager.getInstance().getSession( - session, conf, false, getWork().getLlapMode()); + if (WorkloadManager.isInUse(ss.getConf())) { + // TODO: in future, we may also pass getUserIpAddress. + // Note: for now this will just block to wait for a session based on parallelism. + session = WorkloadManager.getInstance().getSession(session, ss.getUserName(), conf); + } else { + session = TezSessionPoolManager.getInstance().getSession( + session, conf, false, getWork().getLlapMode()); + } ss.setTezSession(session); try { // jobConf will hold all the configuration for hadoop, tez, and hive @@ -230,8 +238,7 @@ public class TezTask extends Task<TezWork> { } finally { // We return this to the pool even if it's unusable; reopen is supposed to handle this. try { - TezSessionPoolManager.getInstance() - .returnSession(session, getWork().getLlapMode()); + session.returnToSessionManager(); } catch (Exception e) { LOG.error("Failed to return session: {} to pool", session, e); throw e; @@ -547,7 +554,9 @@ public class TezTask extends Task<TezWork> { } catch (SessionNotRunning nr) { console.printInfo("Tez session was closed. Reopening..."); - TezSessionPoolManager.getInstance().reopenSession(sessionState, conf); + // close the old one, but keep the tmp files around + // conf is passed in only for the case when session conf is null (tests and legacy paths?) + sessionState = sessionState.reopen(conf, inputOutputJars); console.printInfo("Session re-established."); dagClient = sessionState.getSession().submitDAG(dag); @@ -557,11 +566,11 @@ public class TezTask extends Task<TezWork> { try { console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: " + Arrays.toString(e.getStackTrace()) + " retrying..."); - TezSessionPoolManager.getInstance().reopenSession(sessionState, conf); + sessionState = sessionState.reopen(conf, inputOutputJars); dagClient = sessionState.getSession().submitDAG(dag); } catch (Exception retryException) { // we failed to submit after retrying. Destroy session and bail. - TezSessionPoolManager.getInstance().destroySession(sessionState); + sessionState.destroy(); throw retryException; } } http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java new file mode 100644 index 0000000..00501ee --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.registry.impl.TezAmInstance; + +public class WmTezSession extends TezSessionPoolSession implements AmPluginNode { + private String poolName; + private double clusterFraction; + + private final Object amPluginInfoLock = new Object(); + private AmPluginInfo amPluginInfo = null; + + + /** The actual state of the guaranteed task, and the update state, for the session. */ + // Note: hypothetically, a generic WM-aware-session should not know about guaranteed tasks. + // We should have another subclass for a WM-aware-session-implemented-using-ducks. + // However, since this is the only type of WM for now, this can live here. + private final static class ActualWmState { + // All accesses synchronized on the object itself. Could be replaced with CAS. + int sending = -1, sent = -1, target = 0; + } + private final ActualWmState actualState = new ActualWmState(); + + public WmTezSession(String sessionId, Manager parent, + SessionExpirationTracker expiration, HiveConf conf) { + super(sessionId, parent, expiration, conf); + } + + @Override + public AmPluginInfo waitForAmPluginInfo(int timeoutMs) + throws InterruptedException, TimeoutException { + synchronized (amPluginInfoLock) { + if (amPluginInfo == null) { + amPluginInfoLock.wait(timeoutMs); + if (amPluginInfo == null) { + throw new TimeoutException("No plugin information for " + getSessionId()); + } + } + return amPluginInfo; + } + } + + @Override + void updateFromRegistry(TezAmInstance si) { + synchronized (amPluginInfoLock) { + this.amPluginInfo = new AmPluginInfo(si.getHost(), si.getPluginPort(), + si.getPluginToken(), si.getPluginTokenJobId()); + amPluginInfoLock.notifyAll(); + } + } + + public AmPluginInfo getAmPluginInfo() { + return amPluginInfo; // Only has final fields, no artifacts from the absence of sync. + } + + void setPoolName(String poolName) { + this.poolName = poolName; + } + + String getPoolName() { + return poolName; + } + + void setClusterFraction(double fraction) { + this.clusterFraction = fraction; + } + + double getClusterFraction() { + return this.clusterFraction; + } + + boolean setSendingGuaranteed(int intAlloc) { + assert intAlloc >= 0; + synchronized (actualState) { + actualState.target = intAlloc; + if (actualState.sending != -1) return false; // The sender will take care of this. + if (actualState.sent == intAlloc) return false; // The value didn't change. + actualState.sending = intAlloc; + return true; + } + } + + int setSentGuaranteed() { + // Only one send can be active at the same time. + synchronized (actualState) { + assert actualState.sending != -1; + actualState.sent = actualState.sending; + actualState.sending = -1; + return (actualState.sent == actualState.target) ? -1 : actualState.target; + } + } + + boolean setFailedToSendGuaranteed() { + synchronized (actualState) { + assert actualState.sending != -1; + actualState.sending = -1; + // It's ok to skip a failed message if the target has changed back to the old value. + return (actualState.sent == actualState.target); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java new file mode 100644 index 0000000..288d705 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.concurrent.TimeoutException; + +import java.util.concurrent.TimeUnit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + + +/** Workload management entry point for HS2. */ +public class WorkloadManager + implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl { + private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class); + // TODO: this is a temporary setting that will go away, so it's not in HiveConf. + public static final String TEST_WM_CONFIG = "hive.test.workload.management"; + + private final HiveConf conf; + private final TezSessionPool<WmTezSession> sessions; + private final SessionExpirationTracker expirationTracker; + private final RestrictedConfigChecker restrictedConfig; + private final QueryAllocationManager allocationManager; + private final String yarnQueue; + // TODO: it's not clear that we need to track this - unlike PoolManager we don't have non-pool + // sessions, so the pool itself could internally track the sessions it gave out, since + // calling close on an unopened session is probably harmless. + private final IdentityHashMap<TezSessionPoolSession, Boolean> openSessions = + new IdentityHashMap<>(); + /** Sessions given out (i.e. between get... and return... calls), separated by Hive pool. */ + private final ReentrantReadWriteLock poolsLock = new ReentrantReadWriteLock(); + private final HashMap<String, PoolState> pools = new HashMap<>(); + private final int amRegistryTimeoutMs; + + private static class PoolState { + // Add stuff here as WM is implemented. + private final Object lock = new Object(); + private final List<WmTezSession> sessions = new ArrayList<>(); + } + + // TODO: this is temporary before HiveServerEnvironment is merged. + private static volatile WorkloadManager INSTANCE; + public static WorkloadManager getInstance() { + WorkloadManager wm = INSTANCE; + assert wm != null; + return wm; + } + + public static boolean isInUse(Configuration conf) { + return INSTANCE != null && conf.getBoolean(TEST_WM_CONFIG, false); + } + + /** Called once, when HS2 initializes. */ + public static WorkloadManager create(String yarnQueue, HiveConf conf) { + assert INSTANCE == null; + Token<JobTokenIdentifier> amsToken = createAmsToken(); + // We could derive the expected number of AMs to pass in. + LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, amsToken, -1); + QueryAllocationManager qam = new GuaranteedTasksAllocator(conf, amComm); + // TODO: Hardcode one session for now; initial policies should be passed in. + return (INSTANCE = new WorkloadManager(yarnQueue, conf, 1, qam, amsToken)); + } + + private static Token<JobTokenIdentifier> createAmsToken() { + if (!UserGroupInformation.isSecurityEnabled()) return null; + // This application ID is completely bogus. + ApplicationId id = ApplicationId.newInstance( + System.nanoTime(), (int)(System.nanoTime() % 100000)); + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString())); + JobTokenSecretManager jobTokenManager = new JobTokenSecretManager(); + Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, jobTokenManager); + sessionToken.setService(identifier.getJobId()); + return sessionToken; + } + + @VisibleForTesting + WorkloadManager(String yarnQueue, HiveConf conf, int numSessions, + QueryAllocationManager qam, Token<JobTokenIdentifier> amsToken) { + this.yarnQueue = yarnQueue; + this.conf = conf; + initializeHivePools(); + + this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar( + conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); + sessions = new TezSessionPool<>(conf, numSessions, true); + restrictedConfig = new RestrictedConfigChecker(conf); + allocationManager = qam; + // Only creates the expiration tracker if expiration is configured. + expirationTracker = SessionExpirationTracker.create(conf, this); + for (int i = 0; i < numSessions; i++) { + sessions.addInitialSession(createSession()); + } + } + + private void initializeHivePools() { + // TODO: real implementation + poolsLock.writeLock().lock(); + try { + pools.put("llap", new PoolState()); + } finally { + poolsLock.writeLock().unlock(); + } + } + + public TezSessionState getSession( + TezSessionState session, String userName, HiveConf conf) throws Exception { + validateConfig(conf); + String poolName = mapSessionToPoolName(userName); + // TODO: do query parallelism enforcement here based on the policies and pools. + while (true) { + WmTezSession result = checkSessionForReuse(session); + // TODO: when proper AM management is implemented, we should call tryGet... here, because the + // parallelism will be enforced here, and pool would always have a session for us. + result = (result == null ? sessions.getSession() : result); + result.setQueueName(yarnQueue); + result.setPoolName(poolName); + if (!ensureAmIsRegistered(result)) continue; // Try another. + redistributePoolAllocations(poolName, result, null); + return result; + } + } + + @VisibleForTesting + protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { + // Make sure AM is ready to use and registered with AM registry. + try { + session.waitForAmPluginInfo(amRegistryTimeoutMs); + } catch (TimeoutException ex) { + LOG.error("Timed out waiting for AM registry information for " + session.getSessionId()); + session.destroy(); + return false; + } + return true; + } + + private void redistributePoolAllocations( + String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove) { + List<WmTezSession> sessionsToUpdate = null; + double totalAlloc = 0; + assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName()); + assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName()); + poolsLock.readLock().lock(); + try { + PoolState pool = pools.get(poolName); + synchronized (pool.lock) { + // This should be a 2nd order fn but it's too much pain in Java for one LOC. + if (sessionToAdd != null) { + pool.sessions.add(sessionToAdd); + } + if (sessionToRemove != null) { + if (!pool.sessions.remove(sessionToRemove)) { + LOG.error("Session " + sessionToRemove + " could not be removed from the pool"); + } + sessionToRemove.setClusterFraction(0); + } + totalAlloc = updatePoolAllocations(pool.sessions); + sessionsToUpdate = new ArrayList<>(pool.sessions); + } + } finally { + poolsLock.readLock().unlock(); + } + allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate); + } + + private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception { + if (session == null) return null; + WmTezSession result = null; + if (session instanceof WmTezSession) { + result = (WmTezSession) session; + if (result.isOwnedBy(this)) { + return result; + } + // TODO: this should never happen, at least for now. Throw? + LOG.warn("Attempting to reuse a session not belonging to us: " + result); + result.returnToSessionManager(); + return null; + } + LOG.warn("Attempting to reuse a non-WM session for workload management:" + session); + if (session instanceof TezSessionPoolSession) { + session.returnToSessionManager(); + } else { + session.close(false); // This is a non-pool session, get rid of it. + } + return null; + } + + private double updatePoolAllocations(List<WmTezSession> sessions) { + // TODO: real implementation involving in-the-pool policy interface, etc. + double allocation = 1.0 / sessions.size(); + for (WmTezSession session : sessions) { + session.setClusterFraction(allocation); + } + return 1.0; + } + + private String mapSessionToPoolName(String userName) { + // TODO: real implementation, probably calling into another class initialized with policies. + return "llap"; + } + + private void validateConfig(HiveConf conf) throws HiveException { + String queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); + if ((queueName != null) && !queueName.isEmpty()) { + LOG.warn("Ignoring " + TezConfiguration.TEZ_QUEUE_NAME + "=" + queueName); + conf.set(TezConfiguration.TEZ_QUEUE_NAME, yarnQueue); + } + if (conf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + // Should this also just be ignored? Throw for now, doAs unlike queue is often set by admin. + throw new HiveException(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + " is not supported"); + } + if (restrictedConfig != null) { + restrictedConfig.validate(conf); + } + } + + public void start() throws Exception { + sessions.startInitialSessions(); + if (expirationTracker != null) { + expirationTracker.start(); + } + allocationManager.start(); + } + + public void stop() throws Exception { + List<TezSessionPoolSession> sessionsToClose = null; + synchronized (openSessions) { + sessionsToClose = new ArrayList<TezSessionPoolSession>(openSessions.keySet()); + } + for (TezSessionState sessionState : sessionsToClose) { + sessionState.close(false); + } + if (expirationTracker != null) { + expirationTracker.stop(); + } + allocationManager.stop(); + } + + private WmTezSession createSession() { + WmTezSession session = createSessionObject(TezSessionState.makeSessionId()); + session.setQueueName(yarnQueue); + session.setDefault(); + LOG.info("Created new interactive session " + session.getSessionId()); + return session; + } + + @VisibleForTesting + protected WmTezSession createSessionObject(String sessionId) { + return new WmTezSession(sessionId, this, expirationTracker, new HiveConf(conf)); + } + + @Override + public void returnAfterUse(TezSessionPoolSession session) throws Exception { + boolean isInterrupted = Thread.interrupted(); + try { + WmTezSession wmSession = ensureOwnedSession(session); + redistributePoolAllocations(wmSession.getPoolName(), null, wmSession); + sessions.returnSession((WmTezSession) session); + } finally { + // Reset the interrupt status. + if (isInterrupted) { + Thread.currentThread().interrupt(); + } + } + } + + + /** Closes a running (expired) pool session and reopens it. */ + @Override + public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception { + sessions.replaceSession(ensureOwnedSession(oldSession), createSession(), false, null, null); + } + + private WmTezSession ensureOwnedSession(TezSessionPoolSession oldSession) { + if (!(oldSession instanceof WmTezSession) || !oldSession.isOwnedBy(this)) { + throw new AssertionError("Not a WM session " + oldSession); + } + WmTezSession session = (WmTezSession) oldSession; + return session; + } + + /** Called by TezSessionPoolSession when opened. */ + @Override + public void registerOpenSession(TezSessionPoolSession session) { + synchronized (openSessions) { + openSessions.put(session, true); + } + } + + /** Called by TezSessionPoolSession when closed. */ + @Override + public void unregisterOpenSession(TezSessionPoolSession session) { + synchronized (openSessions) { + openSessions.remove(session); + } + } + + @VisibleForTesting + public SessionExpirationTracker getExpirationTracker() { + return expirationTracker; + } + + @Override + public TezSessionState reopen(TezSessionPoolSession session, Configuration conf, + String[] additionalFiles) throws Exception { + WmTezSession oldSession = ensureOwnedSession(session), newSession = createSession(); + newSession.setPoolName(oldSession.getPoolName()); + HiveConf sessionConf = session.getConf(); + if (sessionConf == null) { + LOG.warn("Session configuration is null for " + session); + // default queue name when the initial session was created + sessionConf = new HiveConf(conf, WorkloadManager.class); + } + sessions.replaceSession(oldSession, newSession, true, additionalFiles, sessionConf); + // We are going to immediately give this session out, so ensure AM registry. + if (!ensureAmIsRegistered(newSession)) { + throw new Exception("Session is not usable after reopen"); + } + redistributePoolAllocations(oldSession.getPoolName(), newSession, oldSession); + return newSession; + } + + @Override + public void destroy(TezSessionPoolSession session) throws Exception { + LOG.warn("Closing a pool session because of retry failure."); + // We never want to lose pool sessions. Replace it instead; al trigger duck redistribution. + WmTezSession wmSession = ensureOwnedSession(session); + closeAndReopenPoolSession(wmSession); + redistributePoolAllocations(wmSession.getPoolName(), null, wmSession); + } + + protected final HiveConf getConf() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 9e2846c..0de9de5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -81,6 +81,8 @@ public class TezJobMonitor { public void run() { TezJobMonitor.killRunningJobs(); try { + // TODO: why does this only kill non-default sessions? + // Nothing for workload management since that only deals with default ones. TezSessionPoolManager.getInstance().closeNonDefaultSessions(false); } catch (Exception e) { // ignore http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java index 7a02a56..209cf57 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java @@ -25,6 +25,7 @@ import com.google.common.cache.CacheBuilder; import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -38,12 +39,15 @@ import org.slf4j.LoggerFactory; public class LlapClusterStateForCompile { protected static final Logger LOG = LoggerFactory.getLogger(LlapClusterStateForCompile.class); - private static final long CLUSTER_UPDATE_INTERVAL_NS = 120 * 1000000000L; // 2 minutes. - private Long lastClusterUpdateNs; - private Integer noConfigNodeCount, executorCount; - private int numExecutorsPerNode = -1; + private static final long CLUSTER_UPDATE_INTERVAL_MS = 120 * 1000L; // 2 minutes. + private volatile Long lastClusterUpdateNs; + private volatile Integer noConfigNodeCount, executorCount; + private volatile int numExecutorsPerNode = -1; private LlapRegistryService svc; private final Configuration conf; + private final long updateIntervalNs; + /** Synchronizes the actual update from the cluster; only one thread at a time. */ + private final Object updateInfoLock = new Object(); // It's difficult to impossible to pass global things to compilation, so we have a static cache. private static final Cache<String, LlapClusterStateForCompile> CACHE = @@ -57,7 +61,7 @@ public class LlapClusterStateForCompile { @Override public LlapClusterStateForCompile call() throws Exception { LOG.info("Creating cluster info for " + userName + ":" + nodes); - return new LlapClusterStateForCompile(conf); + return new LlapClusterStateForCompile(conf, CLUSTER_UPDATE_INTERVAL_MS); } }; try { @@ -67,8 +71,9 @@ public class LlapClusterStateForCompile { } } - private LlapClusterStateForCompile(Configuration conf) { + public LlapClusterStateForCompile(Configuration conf, long updateIntervalMs) { this.conf = conf; + this.updateIntervalNs = updateIntervalMs * 1000000L; } public boolean hasClusterInfo() { @@ -87,46 +92,60 @@ public class LlapClusterStateForCompile { return numExecutorsPerNode; } - public synchronized void initClusterInfo() { - if (lastClusterUpdateNs != null) { - long elapsed = System.nanoTime() - lastClusterUpdateNs; - if (elapsed < CLUSTER_UPDATE_INTERVAL_NS) return; - } - if (svc == null) { - try { - svc = LlapRegistryService.getClient(conf); - } catch (Throwable t) { - LOG.info("Cannot create the client; ignoring", t); - return; // Don't fail; this is best-effort. - } - } - LlapServiceInstanceSet instances; - try { - instances = svc.getInstances(10); - } catch (IOException e) { - LOG.info("Cannot update cluster information; ignoring", e); - return; // Don't wait for the cluster if not started; this is best-effort. - } - int executorsLocal = 0, noConfigNodesLocal = 0; - for (LlapServiceInstance si : instances.getAll()) { - if (si instanceof InactiveServiceInstance) continue; // Shouldn't happen in getAll. - Map<String, String> props = si.getProperties(); - if (props == null) { - ++noConfigNodesLocal; - continue; + public boolean initClusterInfo() { + return initClusterInfo(true); + } + + private boolean isUpdateNeeded(boolean allowUpdate) { + Long lastUpdateLocal = lastClusterUpdateNs; + if (lastUpdateLocal == null) return true; + if (!allowUpdate) return false; + long elapsed = System.nanoTime() - lastUpdateLocal; + return (elapsed >= updateIntervalNs); + } + + public boolean initClusterInfo(boolean allowUpdate) { + if (!isUpdateNeeded(allowUpdate)) return true; + synchronized (updateInfoLock) { + // At this point, no one will take the write lock and update, so we can do the last check. + if (!isUpdateNeeded(allowUpdate)) return true; + if (svc == null) { + try { + svc = LlapRegistryService.getClient(conf); + } catch (Throwable t) { + LOG.info("Cannot create the client; ignoring", t); + return false; // Don't fail; this is best-effort. + } } + LlapServiceInstanceSet instances; try { - int numExecutors = Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); - executorsLocal += numExecutors; - if (numExecutorsPerNode == -1) { - numExecutorsPerNode = numExecutors; + instances = svc.getInstances(10); + } catch (IOException e) { + LOG.info("Cannot update cluster information; ignoring", e); + return false; // Don't wait for the cluster if not started; this is best-effort. + } + int executorsLocal = 0, noConfigNodesLocal = 0; + for (LlapServiceInstance si : instances.getAll()) { + if (si instanceof InactiveServiceInstance) continue; // Shouldn't happen in getAll. + Map<String, String> props = si.getProperties(); + if (props == null) { + ++noConfigNodesLocal; + continue; + } + try { + int numExecutors = Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); + executorsLocal += numExecutors; + if (numExecutorsPerNode == -1) { + numExecutorsPerNode = numExecutors; + } + } catch (NumberFormatException e) { + ++noConfigNodesLocal; } - } catch (NumberFormatException e) { - ++noConfigNodesLocal; } + noConfigNodeCount = noConfigNodesLocal; + executorCount = executorsLocal; + lastClusterUpdateNs = System.nanoTime(); + return true; } - lastClusterUpdateNs = System.nanoTime(); - noConfigNodeCount = noConfigNodesLocal; - executorCount = executorsLocal; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java index 4e5d991..59efd43 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hive.ql.exec.tez; +import java.util.Collection; +import org.apache.hadoop.fs.Path; + import java.io.IOException; import java.net.URISyntaxException; @@ -35,7 +38,7 @@ import org.apache.tez.dag.api.TezException; * use case from hive server 2, we need a session simulation. * */ -public class SampleTezSessionState extends TezSessionPoolSession { +public class SampleTezSessionState extends WmTezSession { private boolean open; private final String sessionId; @@ -43,8 +46,10 @@ public class SampleTezSessionState extends TezSessionPoolSession { private String user; private boolean doAsEnabled; - public SampleTezSessionState(String sessionId, TezSessionPoolManager parent, HiveConf conf) { - super(sessionId, parent, parent.getExpirationTracker(), conf); + public SampleTezSessionState( + String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) { + super(sessionId, parent, (parent instanceof TezSessionPoolManager) + ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf); this.sessionId = sessionId; this.hiveConf = conf; } @@ -59,8 +64,7 @@ public class SampleTezSessionState extends TezSessionPoolSession { } @Override - public void open() throws IOException, LoginException, URISyntaxException, - TezException { + public void open() throws LoginException, IOException { UserGroupInformation ugi = Utils.getUGI(); user = ugi.getShortUserName(); this.doAsEnabled = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); @@ -68,7 +72,18 @@ public class SampleTezSessionState extends TezSessionPoolSession { } @Override - public void close(boolean keepTmpDir) throws TezException, IOException { + public void open(Collection<String> additionalFiles, Path scratchDir) + throws LoginException, IOException { + open(); + } + + @Override + public void open(String[] additionalFiles) throws IOException, LoginException { + open(); + } + + @Override + void close(boolean keepTmpDir) throws TezException, IOException { open = keepTmpDir; } http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java new file mode 100644 index 0000000..5d1a3b6 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback; +import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto; +import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto; + +import org.apache.hadoop.conf.Configuration; + +import org.junit.Test; + + +public class TestGuaranteedTaskAllocator { + + static class MockCommunicator implements LlapPluginEndpointClient { + HashMap<Integer, Integer> messages = new HashMap<>(); + + @Override + public void sendUpdateQuery(UpdateQueryRequestProto request, + AmPluginNode node, + ExecuteRequestCallback<UpdateQueryResponseProto> callback) { + WmTezSession session = (WmTezSession)node; + messages.put(Integer.parseInt(session.getSessionId()), request.getGuaranteedTaskCount()); + callback.setResponse(UpdateQueryResponseProto.getDefaultInstance()); + } + } + + static class GuaranteedTasksAllocatorForTest extends GuaranteedTasksAllocator { + int executorCount = 0; + + public GuaranteedTasksAllocatorForTest(LlapPluginEndpointClient amCommunicator) { + super(new Configuration(), amCommunicator); + } + + // Override external stuff. These could also be injected as extra classes. + + @Override + protected int getExecutorCount(boolean allowUpdate) { + return executorCount; + } + } + + @Test + public void testEqualAllocations() { + testEqualAllocation(8, 5, 1.0f); + testEqualAllocation(0, 3, 1.0f); + testEqualAllocation(3, 1, 1.0f); + testEqualAllocation(5, 5, 1.0f); + testEqualAllocation(7, 10, 1.0f); + testEqualAllocation(98, 10, 1.0f); + testEqualAllocation(40, 5, 0.5f); + testEqualAllocation(40, 5, 0.25f); + testEqualAllocation(40, 5, 0.1f); + testEqualAllocation(40, 5, 0.01f); + } + + @Test + public void testAllocations() { + testAllocation(8, 1.0f, + new double[] { 0.5f, 0.25f, 0.25f }, new int[] { 4, 2, 2 }); + testAllocation(10, 1.0f, + new double[] { 0.33f, 0.4f, 0.27f }, new int[] { 3, 4, 3 }); + // Test incorrect totals. We don't normalize; just make sure we don't under- or overshoot. + testAllocation(10, 1.0f, + new double[] { 0.5f, 0.5f, 0.5f }, new int[] { 5, 5, 0 }); + testAllocation(100, 0.5f, + new double[] { 0.15f, 0.15f, 0.15f }, new int[] { 15, 15, 20 }); + } + + private void testAllocation(int ducks, double total, double[] in, int[] out) { + MockCommunicator comm = new MockCommunicator(); + GuaranteedTasksAllocatorForTest qam = new GuaranteedTasksAllocatorForTest(comm); + List<WmTezSession> sessionsToUpdate = new ArrayList<>(); + comm.messages.clear(); + for (int i = 0; i < in.length; ++i) { + addSession(in[i], sessionsToUpdate, i); + } + qam.executorCount = ducks; + qam.updateSessionsAsync(total, sessionsToUpdate); + Integer[] results = getAllocationResults(comm, in.length); + for (int i = 0; i < results.length; ++i) { + assertNotNull(results[i]); + assertEquals(out[i], results[i].intValue()); + } + } + + private void testEqualAllocation(int ducks, int sessions, double total) { + MockCommunicator comm = new MockCommunicator(); + GuaranteedTasksAllocatorForTest qam = new GuaranteedTasksAllocatorForTest(comm); + List<WmTezSession> sessionsToUpdate = new ArrayList<>(); + comm.messages.clear(); + double fraction = total / sessions; + for (int i = 0; i < sessions; ++i) { + addSession(fraction, sessionsToUpdate, i); + } + qam.executorCount = ducks; + qam.updateSessionsAsync(total, sessionsToUpdate); + Integer[] results = getAllocationResults(comm, sessions); + int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE, totalAssigned = 0; + for (int i = 0; i < results.length; ++i) { + assertNotNull(results[i]); + int val = results[i]; + min = Math.min(val, min); + max = Math.max(val, max); + totalAssigned += val; + } + assertTrue((max - min) <= 1); + assertTrue(Math.abs(total * ducks - totalAssigned) <= 0.5f); + } + + private Integer[] getAllocationResults(MockCommunicator comm, int sessions) { + assertEquals(sessions, comm.messages.size()); + Integer[] results = new Integer[sessions]; + for (Entry<Integer, Integer> e : comm.messages.entrySet()) { + assertNull(results[e.getKey()]); + results[e.getKey()] = e.getValue(); + } + return results; + } + + private void addSession(double alloc, List<WmTezSession> sessionsToUpdate, int i) { + SampleTezSessionState session = new SampleTezSessionState("" + i, null, null); + session.setClusterFraction(alloc); + sessionsToUpdate.add(session); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index 5e1e68c..b9f9f5e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec.tez; import static org.junit.Assert.*; import java.util.HashSet; - import java.util.Set; import java.util.ArrayList; @@ -101,11 +100,11 @@ public class TestTezSessionPool { // draw 1 and replace TezSessionState sessionState = poolManager.getSession(null, conf, true, false); assertEquals("a", sessionState.getQueueName()); - poolManager.returnSession(sessionState, false); + poolManager.returnSession(sessionState); sessionState = poolManager.getSession(null, conf, true, false); assertEquals("a", sessionState.getQueueName()); - poolManager.returnSession(sessionState, false); + poolManager.returnSession(sessionState); // [a,b,c,a,b,c] @@ -114,11 +113,11 @@ public class TestTezSessionPool { TezSessionState second = poolManager.getSession(null, conf, true, false); assertEquals("a", first.getQueueName()); assertEquals("b", second.getQueueName()); - poolManager.returnSession(first, false); - poolManager.returnSession(second, false); + poolManager.returnSession(first); + poolManager.returnSession(second); TezSessionState third = poolManager.getSession(null, conf, true, false); assertEquals("b", third.getQueueName()); - poolManager.returnSession(third, false); + poolManager.returnSession(third); // [b,a,c,a,b,c] @@ -130,15 +129,15 @@ public class TestTezSessionPool { assertEquals("a", second.getQueueName()); assertEquals("c", third.getQueueName()); - poolManager.returnSession(first, false); - poolManager.returnSession(second, false); - poolManager.returnSession(third, false); + poolManager.returnSession(first); + poolManager.returnSession(second); + poolManager.returnSession(third); // [c,a,b,a,b,c] first = poolManager.getSession(null, conf, true, false); assertEquals("c", third.getQueueName()); - poolManager.returnSession(first, false); + poolManager.returnSession(first); } catch (Exception e) { e.printStackTrace(); @@ -169,7 +168,7 @@ public class TestTezSessionPool { assertEquals(4, queueCounts[i]); } for (int i = 0; i < sessions.length; ++i) { - poolManager.returnSession(sessions[i], false); + poolManager.returnSession(sessions[i]); } } catch (Exception e) { @@ -191,7 +190,7 @@ public class TestTezSessionPool { Mockito.when(session.isDefault()).thenReturn(false); Mockito.when(session.getConf()).thenReturn(conf); - poolManager.reopenSession(session, conf); + poolManager.reopenSession(session, conf, null); Mockito.verify(session).close(true); Mockito.verify(session).open(new HashSet<String>(), null); @@ -201,12 +200,12 @@ public class TestTezSessionPool { // user explicitly specified queue name conf.set("tez.queue.name", "tezq1"); - poolManager.reopenSession(session, conf); + poolManager.reopenSession(session, conf, null); assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); // user unsets queue name, will fallback to default session queue conf.unset("tez.queue.name"); - poolManager.reopenSession(session, conf); + poolManager.reopenSession(session, conf, null); assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName()); // session.open will unset the queue name from conf but Mockito intercepts the open call @@ -214,17 +213,17 @@ public class TestTezSessionPool { conf.unset("tez.queue.name"); // change session's default queue to tezq1 and rerun test sequence Mockito.when(session.getQueueName()).thenReturn("tezq1"); - poolManager.reopenSession(session, conf); + poolManager.reopenSession(session, conf, null); assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); // user sets default queue now conf.set("tez.queue.name", "default"); - poolManager.reopenSession(session, conf); + poolManager.reopenSession(session, conf, null); assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName()); // user does not specify queue so use session default conf.unset("tez.queue.name"); - poolManager.reopenSession(session, conf); + poolManager.reopenSession(session, conf, null); assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName()); } catch (Exception e) { e.printStackTrace(); @@ -282,7 +281,8 @@ public class TestTezSessionPool { TezSessionState session = poolManager.getSession(null, tmpConf, true, llap); Thread.sleep((random.nextInt(9) % 10) * 1000); - poolManager.returnSession(session, llap); + session.setLegacyLlapMode(llap); + poolManager.returnSession(session); } catch (Exception e) { e.printStackTrace(); } @@ -328,7 +328,7 @@ public class TestTezSessionPool { Mockito.when(session.isDefault()).thenReturn(false); Mockito.when(session.getConf()).thenReturn(conf); - poolManager.reopenSession(session, conf); + poolManager.reopenSession(session, conf, null); Mockito.verify(session).close(true); Mockito.verify(session).open(new HashSet<String>(), null); http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 9b9eead..2dc334d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -20,15 +20,8 @@ package org.apache.hadoop.hive.ql.exec.tez; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.never; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.io.IOException; import java.util.ArrayList; @@ -180,6 +173,8 @@ public class TestTezTask { session = mock(TezClient.class); sessionState = mock(TezSessionState.class); when(sessionState.getSession()).thenReturn(session); + when(sessionState.reopen(any(Configuration.class), any(String[].class))) + .thenReturn(sessionState); when(session.submitDAG(any(DAG.class))) .thenThrow(new SessionNotRunning("")) .thenReturn(mock(DAGClient.class)); @@ -227,8 +222,7 @@ public class TestTezTask { task.submit(conf, dag, path, appLr, sessionState, Collections.<LocalResource> emptyList(), new String[0], Collections.<String,LocalResource> emptyMap()); // validate close/reopen - verify(sessionState, times(1)).open(any(Collection.class), any(Path.class)); - verify(sessionState, times(1)).close(eq(true)); // now uses pool after HIVE-7043 + verify(sessionState, times(1)).reopen(any(Configuration.class), any(String[].class)); verify(session, times(2)).submitDAG(any(DAG.class)); } http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java new file mode 100644 index 0000000..7adf895 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.tez.dag.api.TezConfiguration; + +import java.util.List; + +import org.junit.Test; + +public class TestWorkloadManager { + private static class MockQam implements QueryAllocationManager { + boolean isCalled = false; + + @Override + public void start() { + } + + @Override + public void stop() { + } + + @Override + public void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessions) { + isCalled = true; + } + + void assertWasCalled() { + assertTrue(isCalled); + isCalled = false; + } + } + + private static class WorkloadManagerForTest extends WorkloadManager { + + WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, + QueryAllocationManager qam) { + super(yarnQueue, conf, numSessions, qam, null); + } + + @Override + protected WmTezSession createSessionObject(String sessionId) { + return new SampleTezSessionState(sessionId, this, new HiveConf(getConf())); + } + + @Override + protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { + return true; + } + } + + @Test(timeout = 10000) + public void testReuse() throws Exception { + HiveConf conf = createConf(); + MockQam qam = new MockQam(); + WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); + wm.start(); + TezSessionState nonPool = mock(TezSessionState.class); + when(nonPool.getConf()).thenReturn(conf); + doNothing().when(nonPool).close(anyBoolean()); + TezSessionState session = wm.getSession(nonPool, null, conf); + verify(nonPool).close(anyBoolean()); + assertNotSame(nonPool, session); + session.returnToSessionManager(); + TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class); + when(diffPool.getConf()).thenReturn(conf); + doNothing().when(diffPool).returnToSessionManager(); + session = wm.getSession(diffPool, null, conf); + verify(diffPool).returnToSessionManager(); + assertNotSame(diffPool, session); + TezSessionState session2 = wm.getSession(session, null, conf); + assertSame(session, session2); + } + + @Test(timeout = 10000) + public void testQueueName() throws Exception { + HiveConf conf = createConf(); + MockQam qam = new MockQam(); + WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); + wm.start(); + // The queue should be ignored. + conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); + TezSessionState session = wm.getSession(null, null, conf); + assertEquals("test", session.getQueueName()); + assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); + session.setQueueName("test2"); + session = wm.getSession(session, null, conf); + assertEquals("test", session.getQueueName()); + } + + // Note (unrelated to epsilon): all the fraction checks are valid with the current logic in the + // absence of policies. This will change when there are policies. + private final static double EPSILON = 0.001; + + @Test(timeout = 10000) + public void testReopen() throws Exception { + // We should always get a different object, and cluster fraction should be propagated. + HiveConf conf = createConf(); + MockQam qam = new MockQam(); + WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); + wm.start(); + WmTezSession session = (WmTezSession) wm.getSession(null, null, conf); + assertEquals(1.0, session.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + WmTezSession session2 = (WmTezSession) session.reopen(conf, null); + assertNotSame(session, session2); + assertEquals(1.0, session2.getClusterFraction(), EPSILON); + assertEquals(0.0, session.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + } + + @Test(timeout = 10000) + public void testDestroyAndReturn() throws Exception { + // Session should not be lost; however the fraction should be discarded. + HiveConf conf = createConf(); + MockQam qam = new MockQam(); + WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); + wm.start(); + WmTezSession session = (WmTezSession) wm.getSession(null, null, conf); + assertEquals(1.0, session.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + WmTezSession session2 = (WmTezSession) wm.getSession(null, null, conf); + assertEquals(0.5, session.getClusterFraction(), EPSILON); + assertEquals(0.5, session2.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + assertNotSame(session, session2); + session.destroy(); // Destroy before returning to the pool. + assertEquals(1.0, session2.getClusterFraction(), EPSILON); + assertEquals(0.0, session.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + + // We never lose pool session, so we should still be able to get. + session = (WmTezSession) wm.getSession(null, null, conf); + session.returnToSessionManager(); + assertEquals(1.0, session2.getClusterFraction(), EPSILON); + assertEquals(0.0, session.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + + // Now destroy the returned session (which is technically not valid) and confirm correctness. + session.destroy(); + assertEquals(1.0, session2.getClusterFraction(), EPSILON); + //qam.assertWasNotCalled(); + } + + private HiveConf createConf() { + HiveConf conf = new HiveConf(); + conf.set(ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME.varname, "-1"); + conf.set(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false"); + conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, ""); + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ac24537f/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index a55cf59..5cb973c 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry; @@ -91,7 +92,6 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; /** * HiveServer2. @@ -107,6 +107,7 @@ public class HiveServer2 extends CompositeService { private CuratorFramework zooKeeperClient; private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens private HttpServer webServer; // Web UI + private WorkloadManager wm; public HiveServer2() { super(HiveServer2.class.getSimpleName()); @@ -161,6 +162,14 @@ public class HiveServer2 extends CompositeService { LlapRegistryService.getClient(hiveConf); } + // Initialize workload management. + String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); + if (wmQueue != null && !wmQueue.isEmpty()) { + wm = WorkloadManager.create(wmQueue, hiveConf); + } else { + wm = null; + } + // Create views registry try { Hive sessionHive = Hive.get(hiveConf); @@ -553,6 +562,14 @@ public class HiveServer2 extends CompositeService { + "Shutting down HiveServer2 anyway.", e); } } + if (wm != null) { + try { + wm.stop(); + } catch (Exception e) { + LOG.error("Workload manager stop had an error during stop of HiveServer2. " + + "Shutting down HiveServer2 anyway.", e); + } + } if (hiveConf != null && hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { try { @@ -618,6 +635,9 @@ public class HiveServer2 extends CompositeService { if (sessionPool != null) { sessionPool.startPool(); } + if (server.wm != null) { + server.wm.start(); + } if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { SparkSessionManagerImpl.getInstance().setup(hiveConf);
