Repository: hive Updated Branches: refs/heads/master 26c0ab6ad -> 68459cf0b
HIVE-18968 : LLAP: report guaranteed tasks count in AM registry to check for consistency (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/68459cf0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/68459cf0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/68459cf0 Branch: refs/heads/master Commit: 68459cf0bfe67dfe72da9095a1dac6b84ede93b0 Parents: 26c0ab6 Author: sergey <ser...@apache.org> Authored: Mon Mar 19 19:10:42 2018 -0700 Committer: sergey <ser...@apache.org> Committed: Mon Mar 19 19:10:42 2018 -0700 ---------------------------------------------------------------------- .../hive/registry/impl/TezAmInstance.java | 7 ++++ .../hive/registry/impl/TezAmRegistryImpl.java | 17 +++++++-- .../hive/registry/impl/ZkRegistryBase.java | 16 ++++++-- .../tezplugins/LlapTaskSchedulerService.java | 40 ++++++++++++++++++-- .../ql/exec/tez/GuaranteedTasksAllocator.java | 13 +++++-- .../ql/exec/tez/QueryAllocationManager.java | 7 +++- .../hadoop/hive/ql/exec/tez/TezSessionPool.java | 14 +++++-- .../hadoop/hive/ql/exec/tez/WmTezSession.java | 38 +++++++++++++++---- .../hive/ql/exec/tez/WorkloadManager.java | 12 +++++- .../hive/ql/exec/tez/TestWorkloadManager.java | 4 ++ .../server/HS2ActivePassiveHARegistry.java | 5 ++- 11 files changed, 143 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java index d09cb24..a862947 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java @@ -16,6 +16,7 @@ package org.apache.hadoop.hive.registry.impl; import java.io.IOException; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; @@ -50,6 +51,12 @@ public class TezAmInstance extends ServiceInstanceBase { public String getSessionId() { return getProperties().get(TezAmRegistryImpl.AM_SESSION_ID); } + + public int getGuaranteedCount() { + String str = getProperties().get(TezAmRegistryImpl.AM_GUARANTEED_COUNT); + if (!StringUtils.isEmpty(str)) return 0; + return Integer.parseInt(str); + } public String getPluginTokenJobId() { return getProperties().get(TezAmRegistryImpl.AM_PLUGIN_JOBID); http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java index ab02cf4..3ff732d 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java @@ -18,6 +18,7 @@ import org.apache.commons.lang3.StringUtils; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; + import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -35,11 +36,12 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> { static final String IPC_TEZCLIENT = "tez-client"; static final String IPC_PLUGIN = "llap-plugin"; static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token", - AM_PLUGIN_JOBID = "am.plugin.jobid"; + AM_PLUGIN_JOBID = "am.plugin.jobid", AM_GUARANTEED_COUNT = "am.guaranteed.count"; private final static String NAMESPACE_PREFIX = "tez-am-"; private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient"; private final String registryName; + private ServiceRecord srv; public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) { String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME); @@ -68,8 +70,11 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> { } public String register(int amPort, int pluginPort, String sessionId, - String serializedToken, String jobIdForToken) throws IOException { - ServiceRecord srv = new ServiceRecord(); + String serializedToken, String jobIdForToken, int guaranteedCount) throws IOException { + if (srv != null) { + throw new UnsupportedOperationException("Already registered with " + srv); + } + srv = new ServiceRecord(); Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint( IPC_TEZCLIENT, new InetSocketAddress(hostname, amPort)); srv.addInternalEndpoint(rpcEndpoint); @@ -83,12 +88,18 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> { boolean hasToken = serializedToken != null; srv.set(AM_PLUGIN_TOKEN, hasToken ? serializedToken : ""); srv.set(AM_PLUGIN_JOBID, jobIdForToken != null ? jobIdForToken : ""); + srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount)); String uniqueId = registerServiceRecord(srv); LOG.info("Registered this AM: rpc: {}, plugin: {}, sessionId: {}, token: {}, znodePath: {}", rpcEndpoint, pluginEndpoint, sessionId, hasToken, getRegistrationZnodePath()); return uniqueId; } + public void updateGuaranteed(int guaranteedCount) throws IOException { + srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount)); + updateServiceRecord(srv, false, false); + } + public TezAmInstance getInstance(String name) { Collection<TezAmInstance> instances = getAllInternal(); for(TezAmInstance instance : instances) { http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index 680d9af..7ca3548 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -98,7 +98,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { private final Set<ServiceInstanceStateChangeListener<InstanceType>> stateChangeListeners; - private final boolean doCheckAcls; + protected final boolean doCheckAcls; // Secure ZK is only set up by the registering service; anyone can read the registrations. private final String zkPrincipal, zkKeytab, saslLoginContextName; private String userNameFromPrincipal; // Only set when setting up the secure config for ZK. @@ -286,7 +286,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { // even under connection or session interruption (will automatically handle retries) znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL, workersPath + "/" + workerNodePrefix, encoder.toBytes(srv)); - + // start the creation of znodes znode.start(); @@ -318,7 +318,12 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { return uniqueId; } - protected final void updateServiceRecord(ServiceRecord srv) throws IOException { + protected final void updateServiceRecord( + ServiceRecord srv, boolean doCheckAcls, boolean closeOnFailure) throws IOException { + if (srv.get(UNIQUE_IDENTIFIER) == null) { + srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); + } + // waitForInitialCreate must have already been called in registerServiceRecord. try { znode.setData(encoder.toBytes(srv)); @@ -331,11 +336,14 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { } } catch (Exception e) { LOG.error("Unable to update znode with new service record", e); - CloseableUtils.closeQuietly(znode); + if (closeOnFailure) { + CloseableUtils.closeQuietly(znode); + } throw (e instanceof IOException) ? (IOException) e : new IOException(e); } } + final void initializeWithoutRegisteringInternal() throws IOException { // Create a znode under the rootNamespace parent for this instance of the server try { http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index d536341..8217964 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -257,6 +257,16 @@ public class LlapTaskSchedulerService extends TaskScheduler { private int totalGuaranteed = 0, unusedGuaranteed = 0; + /** + * An internal version to make sure we don't race and overwrite a newer totalGuaranteed count in + * ZK with an older one, without requiring us to make ZK updates under the main writeLock. + * This is updated under writeLock, together with totalGuaranteed. + */ + private long totalGuaranteedVersion = Long.MIN_VALUE; + private final Object registryUpdateLock = new Object(); // The lock for ZK updates. + /** The last totalGuaranteedVersion sent to ZK. Updated under registryUpdateLock. */ + private long tgVersionSent = Long.MIN_VALUE; + private LlapTaskCommunicator communicator; private final int amPort; private final String serializedToken, jobIdForToken; @@ -504,6 +514,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { @VisibleForTesting void updateGuaranteedCount(int newTotalGuaranteed) { List<TaskInfo> toUpdate = null; + long tgVersionForZk; writeLock.lock(); try { // TODO: when this code is a little less hot, change most logs to debug. @@ -514,8 +525,9 @@ public class LlapTaskSchedulerService extends TaskScheduler { // The "procedural" approach requires that we track the ducks traveling on network, // concurrent terminations, etc. So, while more precise it's much more complex. int delta = newTotalGuaranteed - totalGuaranteed; - WM_LOG.info("Received guaranteed tasks " + newTotalGuaranteed - + "; the delta to adjust by is " + delta); + tgVersionForZk = ++totalGuaranteedVersion; + WM_LOG.info("Received guaranteed tasks " + newTotalGuaranteed + " (internal version " + + tgVersionForZk + "); the delta to adjust by is " + delta); if (delta == 0) return; totalGuaranteed = newTotalGuaranteed; if (metrics != null) { @@ -562,6 +574,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { } finally { writeLock.unlock(); } + updateGuaranteedInRegistry(tgVersionForZk, newTotalGuaranteed); if (toUpdate == null) return; WM_LOG.info("Sending updates to " + toUpdate.size() + " tasks"); for (TaskInfo ti : toUpdate) { @@ -752,7 +765,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { amRegistry.start(); int pluginPort = pluginEndpoint != null ? pluginEndpoint.getActualPort() : -1; amRegistry.register(amPort, pluginPort, HiveConf.getVar(conf, ConfVars.HIVESESSIONID), - serializedToken, jobIdForToken); + serializedToken, jobIdForToken, 0); } } finally { writeLock.unlock(); @@ -969,6 +982,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { if (metrics != null) { metrics.incrCompletedDagCount(); } + long tgVersionForZk; writeLock.lock(); try { dagRunning = false; @@ -1002,6 +1016,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { } totalGuaranteed = unusedGuaranteed = 0; + tgVersionForZk = ++totalGuaranteedVersion; if (metrics != null) { metrics.setDagId(null); // We remove the tasks above without state checks so just reset all metrics to 0. @@ -1012,9 +1027,28 @@ public class LlapTaskSchedulerService extends TaskScheduler { } finally { writeLock.unlock(); } + updateGuaranteedInRegistry(tgVersionForZk, 0); // TODO Cleanup pending tasks etc, so that the next dag is not affected. } + private void updateGuaranteedInRegistry(long tgVersionForZk, int newTotalGuaranteed) { + if (amRegistry == null) return; + synchronized (registryUpdateLock) { + // Make sure the updates are not sent to ZK out of order compared to how we apply them in AM. + if (tgVersionForZk <= tgVersionSent) return; + try { + amRegistry.updateGuaranteed(newTotalGuaranteed); + tgVersionSent = tgVersionForZk; + } catch (IOException ex) { + // Ignore for now. HS2 will probably try to send us the count we already have again. + // We are assuming here that if we can't talk to ZK we will eventually fail. + LOG.error("Failed to update guaranteed count in registry; ignoring", ex); + } + } + } + + + @Override public void blacklistNode(NodeId nodeId) { LOG.info("BlacklistNode not supported"); http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java index 82b38d5..a52928c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java @@ -143,14 +143,19 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager { } } - private void updateSessionAsync(final WmTezSession session, final int intAlloc) { - boolean needsUpdate = session.setSendingGuaranteed(intAlloc); - if (!needsUpdate) return; + @Override + public void updateSessionAsync(WmTezSession session) { + updateSessionAsync(session, null); // Resend existing value if necessary. + } + + private void updateSessionAsync(final WmTezSession session, final Integer intAlloc) { + Integer valueToSend = session.setSendingGuaranteed(intAlloc); + if (valueToSend == null) return; // Note: this assumes that the pattern where the same session object is reset with a different // Tez client is not used. It was used a lot in the past but appears to be gone from most // HS2 session pool paths, and this patch removes the last one (reopen). UpdateQueryRequestProto request = UpdateQueryRequestProto - .newBuilder().setGuaranteedTaskCount(intAlloc).build(); + .newBuilder().setGuaranteedTaskCount(valueToSend.intValue()).build(); LOG.info("Updating {} with {} guaranteed tasks", session.getSessionId(), intAlloc); amCommunicator.sendUpdateQuery(request, (AmPluginNode)session, new UpdateCallback(session)); } http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java index a446902..9885ce7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java @@ -27,7 +27,7 @@ interface QueryAllocationManager { void start(); void stop(); /** - * Updates the session allocations asynchoronously. + * Updates the session allocations asynchronously. * @param totalMaxAlloc The total maximum fraction of the cluster to allocate. Used to * avoid various artifacts, esp. with small numbers and double weirdness. * Null means the total is unknown. @@ -39,4 +39,9 @@ interface QueryAllocationManager { * Sets a callback to be invoked on cluster changes relevant to resource allocation. */ void setClusterChangedCallback(Runnable clusterChangedCallback); + + /** + * Updates the session asynchronously with the existing allocation. + */ + void updateSessionAsync(WmTezSession session); } http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 0962460..89954cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -334,10 +334,16 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> { } @Override - public void onUpdate(TezAmInstance serviceInstance, int ephSeqVersion) { - // We currently never update the znode once registered. - // AM recovery will create a new node when it calls register. - LOG.warn("Received an unexpected update for instance={}. Ignoring", serviceInstance); + public void onUpdate(TezAmInstance si, int ephSeqVersion) { + String sessionId = si.getSessionId(); + SessionType session = bySessionId.get(sessionId); + if (session != null) { + LOG.info("AM for " + sessionId + ", v." + ephSeqVersion + " has updated; updating [" + + session + "] with an endpoint at " + si.getPluginPort()); + session.updateFromRegistry(si, ephSeqVersion); + } else { + LOG.warn("AM for an unknown " + sessionId + " has updated; ignoring"); + } } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index d4c3ab9..1cf5493 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -106,6 +106,13 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode @Override void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { + updateAmEndpointInfo(si, ephSeqVersion); + if (si != null) { + handleGuaranteedTasksChange(si.getGuaranteedCount()); + } + } + + public void updateAmEndpointInfo(TezAmInstance si, int ephSeqVersion) { AmPluginInfo info = si == null ? null : new AmPluginInfo(si.getHost(), si.getPluginPort(), si.getPluginToken(), si.getPluginTokenJobId()); synchronized (amPluginInfoLock) { @@ -131,6 +138,19 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode } } } + + + private void handleGuaranteedTasksChange(int guaranteedCount) { + boolean doNotify = false; + synchronized (actualState) { + // A noop if we are in process of sending or if we have the correct value. + if (actualState.sending != -1 || actualState.sent == guaranteedCount) return; + actualState.sent = guaranteedCount; + doNotify = actualState.target != guaranteedCount; + } + if (!doNotify) return; + wmParent.notifyOfInconsistentAllocation(this); + } @Override public AmPluginInfo getAmPluginInfo(Ref<Integer> version) { @@ -161,17 +181,21 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode return this.clusterFraction; } - boolean setSendingGuaranteed(int intAlloc) { - assert intAlloc >= 0; + Integer setSendingGuaranteed(Integer intAlloc) { + assert intAlloc == null || intAlloc >= 0; synchronized (actualState) { - actualState.target = intAlloc; - if (actualState.sending != -1) return false; // The sender will take care of this. - if (actualState.sent == intAlloc) return false; // The value didn't change. + if (intAlloc != null) { + actualState.target = intAlloc; + } else { + intAlloc = actualState.target; + } + if (actualState.sending != -1) return null; // The sender will take care of this. + if (actualState.sent == intAlloc) return null; // The value didn't change. actualState.sending = intAlloc; - return true; + return intAlloc; } } - + public String getAllocationState() { synchronized (actualState) { return "actual/target " + actualState.sent + "/" + actualState.target http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 00e2c20..f0e620c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -685,7 +685,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida processPoolChangesOnMasterThread(poolName, hasRequeues, syncWork); } - // 12. Save state for future iterations. for (KillQueryContext killCtx : syncWork.toKillQuery.values()) { if (killQueryInProgress.put(killCtx.session, killCtx) != null) { @@ -698,7 +697,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida entry.getValue().endEvent(entry.getKey()); } - // 14. Notify tests and global async ops. + // 14. Give our final state to UI/API requests if any. if (e.dumpStateFuture != null) { List<String> result = new ArrayList<>(); result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool); @@ -708,6 +707,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida e.dumpStateFuture.set(result); e.dumpStateFuture = null; } + + // 15. Notify tests and global async ops. if (e.testEvent != null) { e.testEvent.set(true); e.testEvent = null; @@ -1422,6 +1423,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } } + public void notifyOfInconsistentAllocation(WmTezSession session) { + // We just act as a pass-thru between the session and allocation manager. We don't change the + // allocation target (only WM thread can do that); therefore we can do this directly and + // actualState-based sync will take care of multiple potential message senders. + allocationManager.updateSessionAsync(session); + } + public void notifyOfClusterStateChange() { currentLock.lock(); try { http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 8d185ba..20a5947 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -110,6 +110,10 @@ public class TestWorkloadManager { public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions) { isCalled = true; } + + @Override + public void updateSessionAsync(WmTezSession session) { + } void assertWasCalledAndReset() { assertTrue(isCalled); http://git-wip-us.apache.org/repos/asf/hive/blob/68459cf0/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java index 819ce19..7c75489 100644 --- a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java +++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java @@ -156,9 +156,10 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan addEndpointToServiceRecord(getNewServiceRecord(), PASSIVE_ENDPOINT); } - private void addEndpointToServiceRecord(final ServiceRecord srv, final String endpointName) throws IOException { + private void addEndpointToServiceRecord( + final ServiceRecord srv, final String endpointName) throws IOException { updateEndpoint(srv, endpointName); - updateServiceRecord(srv); + updateServiceRecord(srv, doCheckAcls, true); } private void updateEndpoint(final ServiceRecord srv, final String endpointName) {