This is an automated email from the ASF dual-hosted git repository.
ayushtkn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ccc70354fd4 HIVE-29651: Update ZookeeperExternalSessionsRegistryClient
to handle multiple HiveServer2 instances submitting DAGs concurrently to
available Tez External Sessions (#6528)
ccc70354fd4 is described below
commit ccc70354fd4c4b89ce209c1b4d4a2fff96698388
Author: Tanishq Chugh <[email protected]>
AuthorDate: Mon Jun 22 16:32:54 2026 +0530
HIVE-29651: Update ZookeeperExternalSessionsRegistryClient to handle
multiple HiveServer2 instances submitting DAGs concurrently to available Tez
External Sessions (#6528)
---
.../hive/ql/exec/tez/ExternalSessionsRegistry.java | 7 +
.../hive/ql/exec/tez/TezExternalSessionState.java | 83 +++++++++
.../apache/hadoop/hive/ql/exec/tez/TezSession.java | 3 +
.../hive/ql/exec/tez/TezSessionPoolManager.java | 14 ++
.../hive/ql/exec/tez/TezSessionPoolSession.java | 7 +
.../hadoop/hive/ql/exec/tez/TezSessionState.java | 7 +
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 7 +-
.../ZookeeperExternalSessionsRegistryClient.java | 203 +++++++++++++++++----
.../hive/ql/exec/tez/monitoring/TezJobMonitor.java | 14 ++
.../hadoop/hive/ql/exec/tez/TestTezTask.java | 12 +-
...estZookeeperExternalSessionsRegistryClient.java | 148 +++++++++++++++
11 files changed, 461 insertions(+), 44 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java
index 7f279c3648d..8f822c2585c 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java
@@ -40,4 +40,11 @@ public interface ExternalSessionsRegistry {
* Closes the external session registry.
*/
void close();
+
+ /**
+ * Returns true if this registry instance currently holds a claim on the
given AM.
+ */
+ default boolean isClaimed(String appId) {
+ return true; // Non-ZK registries case is always true
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java
index bd49d02e530..bb4cc106d5b 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java
@@ -19,17 +19,26 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
+import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
/**
@@ -139,6 +148,11 @@ public void close(boolean keepDagFilesDir) throws
Exception {
// We never close external sessions that don't have errors.
try {
if (externalAppId != null) {
+ LOG.debug("Returning external session with appID: {}", externalAppId);
+ SessionState sessionState = SessionState.get();
+ if (sessionState != null) {
+ sessionState.setTezSession(null);
+ }
registry.returnSession(externalAppId);
}
} catch (Exception e) {
@@ -187,4 +201,73 @@ public boolean killQuery(String reason) throws
HiveException {
killQuery.killQuery(queryId, reason, conf, false);
return true;
}
+
+ @Override
+ public DAGClient submitDAG(DAG dag) throws TezException, IOException {
+ if (!registry.isClaimed(externalAppId)) {
+ throw new TezException("Cannot submit DAG as the Tez Session no-longer
owns the AM: " + externalAppId);
+ }
+ try {
+ return getTezClient().submitDAG(dag);
+ } catch (TezException e) {
+ if (e.getMessage() == null || !e.getMessage().contains("App master
already running a DAG")) {
+ throw e;
+ }
+ tryKillRunningDAGs(getTezClient());
+ return getTezClient().submitDAG(dag);
+ }
+ }
+
+ private void tryKillRunningDAGs(TezClient session) throws TezException {
+ if (!registry.isClaimed(externalAppId)) {
+ throw new TezException("Cannot kill running DAG as the Tez Session
no-longer owns the AM: " + externalAppId);
+ }
+ LOG.info("External session has an AM which is already running a DAG on app
ID {}", externalAppId);
+ DAGClientAMProtocolBlockingPB proxy = session.sendAMHeartbeat(null);
+ if (proxy == null) {
+ throw new TezException("Error while trying to connect to AM for app ID "
+ externalAppId);
+ }
+ long killTimeoutMs = TimeUnit.SECONDS.toMillis(
+ HiveConf.getIntVar(conf,
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS));
+ try {
+ DAGClientAMProtocolRPC.GetAllDAGsResponseProto allDAGSResponse =
+ proxy.getAllDAGs(null,
DAGClientAMProtocolRPC.GetAllDAGsRequestProto.newBuilder().build());
+ for (String dagId : allDAGSResponse.getDagIdList()) {
+ LOG.info("External session: attempting to kill dagId {} on app ID {}",
dagId, externalAppId);
+ proxy.tryKillDAG(null,
DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(dagId).build());
+ waitForDagTerminal(proxy, dagId, killTimeoutMs);
+ }
+ } catch (Exception e) {
+ throw new TezException("Error while trying to kill existing DAG running
on app ID " + externalAppId, e);
+ }
+ }
+
+ private void waitForDagTerminal(DAGClientAMProtocolBlockingPB proxy, String
dagId, long timeoutMs)
+ throws TezException, ServiceException {
+ long startTimeMs = System.currentTimeMillis();
+ long pollIntervalMs =
conf.getTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+ while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
+ long remainingMs = timeoutMs - (System.currentTimeMillis() -
startTimeMs);
+ DAGClientAMProtocolRPC.GetDAGStatusResponseProto response =
proxy.getDAGStatus(null,
+ DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder()
+ .setDagId(dagId)
+ .setTimeout(Math.min(pollIntervalMs, remainingMs))
+ .build());
+ if (response.hasDagStatus() && response.getDagStatus().hasState()
+ && isTerminalDagState(response.getDagStatus().getState())) {
+ LOG.info("External session: dagId {} on app ID {} reached terminal
state {}", dagId, externalAppId,
+ response.getDagStatus().getState());
+ return;
+ }
+ }
+ throw new TezException("Timed out after " + timeoutMs + " ms waiting for
orphan DAG " + dagId
+ + " on app ID " + externalAppId + " to reach terminal state after
kill");
+ }
+
+ private static boolean isTerminalDagState(DAGProtos.DAGStatusStateProto
state) {
+ return switch (state) {
+ case DAG_SUCCEEDED, DAG_KILLED, DAG_FAILED, DAG_ERROR -> true;
+ default -> false;
+ };
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java
index 68844bd8172..2f64ec41a58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java
@@ -34,7 +34,9 @@
import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
/**
@@ -86,6 +88,7 @@ public String toString() {
HiveConf getConf();
TezClient getTezClient();
+ DAGClient submitDAG(DAG dag) throws TezException, IOException;
boolean isOpen();
boolean isOpening();
boolean getDoAsEnabled();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 605a92ebc8f..9b9fbfebd9f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -353,6 +353,20 @@ void returnSession(TezSession tezSessionState) {
+ " belongs to the pool. Put it back in");
defaultSessionPool.returnSession((TezSessionPoolSession)tezSessionState);
}
+
+ if (useExternalSessions && !tezSessionState.isDefault()) {
+ if (tezSessionState.getTezClient() != null
+ && tezSessionState.getTezClient().getAppMasterApplicationId() !=
null) {
+ try {
+ tezSessionState.close(false);
+ } catch (Exception ex) {
+ LOG.warn("Failed to return external Tez session {}",
tezSessionState.getSessionId(), ex);
+ }
+ } else {
+ LOG.warn("Not returning session '{}' as tez client or app id is
null", tezSessionState.getSessionId());
+ }
+ }
+
// non default session nothing changes. The user can continue to use the
existing
// session in the SessionState
} finally {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 415072f221d..a473b32e887 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -34,7 +34,9 @@
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -337,6 +339,11 @@ public TezClient getTezClient() {
return baseSession.getTezClient();
}
+ @Override
+ public DAGClient submitDAG(DAG dag) throws TezException, IOException {
+ return baseSession.submitDAG(dag);
+ }
+
@Override
public boolean isOpening() {
return baseSession.isOpening();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 2924416ad48..b50c15cf9b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -76,11 +76,13 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
@@ -820,6 +822,11 @@ public TezClient getTezClient() {
return session;
}
+ @Override
+ public DAGClient submitDAG(DAG dag) throws TezException, IOException {
+ return getTezClient().submitDAG(dag);
+ }
+
@Override
public LocalResource getAppJarLr() {
return appJarLr;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 6bd801dd64c..8e3b8140752 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -472,9 +472,8 @@ void ensureSessionHasResources(
TezSession session, String[] nonConfResources) throws Exception {
TezClient client = session.getTezClient();
// TODO null can also mean that this operation was interrupted. Should we
really try to re-create the session in that case ?
- if (client == null) {
- // Note: the only sane case where this can happen is the non-pool one.
We should get rid
- // of it, in non-pool case perf doesn't matter so we might as well
open at get time
+ if (client == null || !session.isOpen()) {
+ // Note: We should get rid of it, in non-pool case perf doesn't matter
so we might as well open at get time
// and then call update like we do in the else.
// Can happen if the user sets the tez flag after the session was
established.
LOG.info("Tez session hasn't been created yet. Opening session");
@@ -692,7 +691,7 @@ DAGClient submit(DAG dag, Ref<TezSession> sessionStateRef)
throws Exception {
private DAGClient submitInternal(DAG dag, TezSession sessionState) throws
TezException, IOException {
runtimeContext.init(sessionState);
- return sessionState.getTezClient().submitDAG(dag);
+ return sessionState.submitDAG(dag);
}
private void sessionDestroyOrReturnToPool(Ref<TezSession> sessionStateRef,
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java
index 550c77e573a..08df3bc922a 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java
@@ -21,6 +21,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.curator.framework.CuratorFramework;
@@ -31,9 +32,14 @@
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,15 +48,20 @@
// TODO: tez should provide this registry
public class ZookeeperExternalSessionsRegistryClient implements
ExternalSessionsRegistry {
private static final Logger LOG =
LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class);
+ private static final String PATH_SEPARATOR = "/";
private final HiveConf initConf;
private final Set<String> available = new HashSet<>();
private final Set<String> taken = new HashSet<>();
private final Object lock = new Object();
private final int maxAttempts;
-
+ private CuratorFramework client;
private CuratorCache cache;
+ private CuratorCache claimsCache;
+ private InterProcessMutex globalQueue;
+ private String claimsPath;
private volatile boolean isInitialized;
+ private volatile boolean zkConnectionHealthy = false;
public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) {
@@ -59,25 +70,91 @@ public ZookeeperExternalSessionsRegistryClient(final
HiveConf initConf) {
}
private static String getApplicationId(final ChildData childData) {
- return childData.getPath().substring(childData.getPath().lastIndexOf("/")
+ 1);
+ return
childData.getPath().substring(childData.getPath().lastIndexOf(PATH_SEPARATOR) +
1);
}
private void init() {
String zkServer = HiveConf.getVar(initConf,
ConfVars.HIVE_ZOOKEEPER_QUORUM);
+ int sessionTimeoutMs = (int) HiveConf.getTimeVar(initConf,
ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ int connectionTimeoutMs = (int) HiveConf.getTimeVar(initConf,
ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ int baseSleepTimeMs = (int) HiveConf.getTimeVar(initConf,
ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+ TimeUnit.MILLISECONDS);
+ int maxRetries = HiveConf.getIntVar(initConf,
ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
String zkNamespace = HiveConf.getVar(initConf,
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
String effectivePath = normalizeZkPath(zkNamespace);
- CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new
ExponentialBackoffRetry(1000, 3));
+ this.claimsPath = effectivePath + "-claims";
+ // After connection state changes to SUSPENDED, the client has already
consumed ~2/3 of the negotiated session
+ // timeout. Use 33% of the remaining window so LOST aligns with when the
ZK server expires the session and drops
+ // ephemeral claim nodes. For Ref: Curator TN14
+ this.client = CuratorFrameworkFactory.builder()
+ .connectString(zkServer)
+ .sessionTimeoutMs(sessionTimeoutMs)
+ .connectionTimeoutMs(connectionTimeoutMs)
+ .simulatedSessionExpirationPercent(33)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries))
+ .build();
+
synchronized (lock) {
+ client.getConnectionStateListenable().addListener((curatorClient,
newState) -> {
+ if (newState == ConnectionState.CONNECTED || newState ==
ConnectionState.RECONNECTED) {
+ zkConnectionHealthy = true;
+ } else if (newState == ConnectionState.LOST) {
+ zkConnectionHealthy = false;
+ Set<String> sessionsToKill;
+ synchronized (lock) {
+ LOG.error("ZK connection state has changed to lost; killing
running DAGs on claimed AMs: {}", taken);
+ sessionsToKill = new HashSet<>(taken);
+ taken.clear();
+ }
+ sessionsToKill.forEach(TezJobMonitor::killRunningDAGsForApplication);
+ }
+ });
client.start();
+
+ this.globalQueue = new InterProcessMutex(client, effectivePath +
"-queue");
this.cache = CuratorCache.build(client, effectivePath);
CuratorCacheListener listener = CuratorCacheListener.builder()
.forPathChildrenCache(effectivePath, client, new
ExternalSessionsPathListener())
.build();
cache.listenable().addListener(listener);
cache.start();
+
+ this.claimsCache = CuratorCache.build(client, claimsPath);
+ CuratorCacheListener claimsListener =
CuratorCacheListener.builder().forCreates(
+ childData -> {
+ if (childData == null) {
+ return;
+ }
+ String applicationId = getApplicationId(childData);
+ synchronized (lock) {
+ available.remove(applicationId);
+ }
+ }).forDeletes(
+ childData -> {
+ if (childData == null) {
+ return;
+ }
+ String applicationId = getApplicationId(childData);
+ synchronized (lock) {
+ if (!taken.contains(applicationId)) {
+ if (cache.get(effectivePath + PATH_SEPARATOR +
applicationId).isPresent()) {
+ available.add(applicationId);
+ lock.notifyAll();
+ } else {
+ LOG.info("Ignoring AM claim removal for {} because the base
AM node no longer exists.",
+ applicationId);
+ }
+ }
+ }
+ }).build();
+ claimsCache.listenable().addListener(claimsListener);
+ claimsCache.start();
+
cache.stream()
.filter(childData -> childData.getPath() != null
- && childData.getPath().startsWith(effectivePath + "/"))
+ && childData.getPath().startsWith(effectivePath +
PATH_SEPARATOR))
.forEach(childData -> available.add(getApplicationId(childData)));
LOG.info("Initial external sessions: {}", available);
isInitialized = true;
@@ -86,27 +163,54 @@ private void init() {
@VisibleForTesting
static String normalizeZkPath(String zkNamespace) {
- return (zkNamespace.startsWith("/") ? zkNamespace : "/" + zkNamespace);
+ return (zkNamespace.startsWith(PATH_SEPARATOR) ? zkNamespace :
PATH_SEPARATOR + zkNamespace);
}
@Override
public String getSession() throws Exception {
- synchronized (lock) {
- if (!isInitialized) {
- init();
- }
- long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts);
- while (available.isEmpty() && ((endTimeNs - System.nanoTime()) > 0)) {
- lock.wait(1000L);
+ if (!isInitialized) {
+ synchronized (lock) {
+ if (!isInitialized) {
+ init();
+ }
}
- Iterator<String> iter = available.iterator();
- if (!iter.hasNext()) {
- throw new IOException("Cannot get a session after " + maxAttempts + "
attempts");
+ }
+
+ long startTimeNs = System.nanoTime();
+ long timeoutNs = TimeUnit.SECONDS.toNanos(maxAttempts);
+ long queueWaitTimeMs = Math.max(0, (timeoutNs - (System.nanoTime() -
startTimeNs)) / 1000000L);
+ if (!globalQueue.acquire(queueWaitTimeMs, TimeUnit.MILLISECONDS)) {
+ throw new IOException("Cannot get a session (timed out in queue) after "
+ maxAttempts + " seconds");
+ }
+ try {
+ synchronized (lock) {
+ while (System.nanoTime() - startTimeNs < timeoutNs) {
+ Iterator<String> iter = available.iterator();
+
+ while (iter.hasNext()) {
+ String appId = iter.next();
+ try {
+ String claimNodePath = claimsPath + PATH_SEPARATOR + appId;
+
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(claimNodePath);
+ iter.remove();
+ taken.add(appId);
+ return appId;
+ } catch (KeeperException.NodeExistsException e) {
+ iter.remove();
+ }
+ }
+ long remainingTimeNs = timeoutNs - (System.nanoTime() - startTimeNs);
+ if (remainingTimeNs > 0) {
+ // Add one to remainingTime milliseconds computation to prevent
the case where
+ // (remainingTimeNs / 1000000L) can return 0 causing the lock to
be held indefinitely.
+ long waitTimeMs = Math.min(1000L, (remainingTimeNs / 1000000L) +
1);
+ lock.wait(waitTimeMs);
+ }
+ }
+ throw new IOException("Cannot get a session after waiting for " +
maxAttempts + " seconds (timeout exhausted)");
}
- String appId = iter.next();
- iter.remove();
- taken.add(appId);
- return appId;
+ } finally {
+ globalQueue.release();
}
}
@@ -119,6 +223,16 @@ public void returnSession(String appId) {
if (!taken.remove(appId)) {
return; // Session has been removed from ZK.
}
+
+ try {
+ client.delete().guaranteed().forPath(claimsPath + PATH_SEPARATOR +
appId);
+ } catch (KeeperException.NoNodeException e) {
+ // If the claim Node has already been deleted, we can ignore it.
+ LOG.debug("Claim Node has already been deleted for the session {}",
appId, e);
+ } catch (Exception e) {
+ LOG.warn("Failed to delete claim node for session {}", appId, e);
+ }
+
available.add(appId);
lock.notifyAll();
}
@@ -126,9 +240,15 @@ public void returnSession(String appId) {
@Override
public void close() {
+ if (claimsCache != null) {
+ claimsCache.close();
+ }
if (cache != null) {
cache.close();
}
+ if (client != null) {
+ client.close();
+ }
}
private final class ExternalSessionsPathListener implements
PathChildrenCacheListener {
@@ -146,23 +266,38 @@ public void childEvent(final CuratorFramework client,
final PathChildrenCacheEve
synchronized (lock) {
switch (event.getType()) {
- case CHILD_UPDATED, CHILD_ADDED:
- if (available.contains(applicationId) ||
taken.contains(applicationId)) {
- return; // We do not expect updates to existing sessions; ignore
them for now.
- }
- available.add(applicationId);
- break;
- case CHILD_REMOVED:
- if (taken.remove(applicationId)) {
- LOG.warn("The session in use has disappeared from the registry
({})", applicationId);
- } else if (!available.remove(applicationId)) {
- LOG.warn("An unknown session has been removed ({})",
applicationId);
- }
- break;
- default:
- // Ignore all the other events; logged above.
+ case CHILD_UPDATED, CHILD_ADDED:
+ if (available.contains(applicationId) ||
taken.contains(applicationId)) {
+ return; // We do not expect updates to existing sessions; ignore
them for now.
+ }
+ if (claimsCache != null && claimsCache.get(claimsPath +
PATH_SEPARATOR + applicationId).isPresent()) {
+ LOG.info("Ignoring newly added AM {} because it is already claimed
by another session.", applicationId);
+ return;
+ }
+ available.add(applicationId);
+ lock.notifyAll();
+ break;
+ case CHILD_REMOVED:
+ if (taken.remove(applicationId)) {
+ LOG.warn("The session in use has disappeared from the registry
({})", applicationId);
+ } else if (!available.remove(applicationId)) {
+ LOG.warn("An unknown session has been removed ({})",
applicationId);
+ }
+ break;
+ default:
+ // Ignore all the other events; logged above.
}
}
}
}
+
+ @Override
+ public boolean isClaimed(String appId) {
+ if (!zkConnectionHealthy) {
+ return false;
+ }
+ synchronized (lock) {
+ return taken.contains(appId);
+ }
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index 92844f4d571..871258837af 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -499,6 +499,20 @@ public static void killRunningJobs() {
}
}
+ public static void killRunningDAGsForApplication(String applicationId) {
+ synchronized (shutdownList) {
+ for (DAGClient c : shutdownList) {
+ try {
+ if (applicationId.equals(c.getSessionIdentifierString())) {
+ c.tryKillDAG();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while trying to kill running DAG on tez session
{}", applicationId, e);
+ }
+ }
+ }
+ }
+
static long getCounterValueByGroupName(TezCounters vertexCounters, String
groupNamePattern,
String counterName) {
TezCounter tezCounter =
vertexCounters.getGroup(groupNamePattern).findCounter(counterName);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index fa71845ece2..34aa466084a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -180,7 +180,7 @@ public Edge answer(InvocationOnMock invocation) throws
Throwable {
sessionState = mock(TezSessionState.class);
when(sessionState.getTezClient()).thenReturn(session);
when(sessionState.reopen()).thenReturn(sessionState);
- when(session.submitDAG(any(DAG.class)))
+ when(sessionState.submitDAG(any(DAG.class)))
.thenThrow(new SessionNotRunning(""))
.thenReturn(mock(DAGClient.class));
}
@@ -229,7 +229,7 @@ public void testSubmit() throws Exception {
task.submit(dag, Ref.from(sessionState));
// validate close/reopen
verify(sessionState, times(1)).reopen();
- verify(session, times(2)).submitDAG(any(DAG.class));
+ verify(sessionState, times(2)).submitDAG(any(DAG.class));
}
@Test
@@ -241,14 +241,14 @@ public void testSubmitOnNonPoolSession() throws Exception
{
TezClient tezClient = mock(TezClient.class);
when(tezSessionState.reopen()).thenThrow(new HiveException("Dag cannot be
submitted"));
when(tezSessionState.getTezClient()).thenReturn(tezClient);
- when(tezClient.submitDAG(any(DAG.class))).thenThrow(new
SessionNotRunning(""));
+ when(tezSessionState.submitDAG(any(DAG.class))).thenThrow(new
SessionNotRunning(""));
doNothing().when(tezSessionState).destroy();
boolean isException = false;
try {
task.submit(dag, Ref.from(tezSessionState));
} catch (Exception e) {
isException = true;
- verify(tezClient, times(1)).submitDAG(any(DAG.class));
+ verify(tezSessionState, times(1)).submitDAG(any(DAG.class));
verify(tezSessionState, times(2)).reopen();
verify(tezSessionState, times(1)).destroy();
verify(tezSessionState, times(0)).returnToSessionManager();
@@ -266,13 +266,13 @@ public void testSubmitOnPoolSession() throws Exception {
doNothing().when(tezSessionPoolSession).returnToSessionManager();
when(tezSessionPoolSession.getTezClient()).thenReturn(tezClient);
when(tezSessionPoolSession.isDefault()).thenReturn(true);
- when(tezClient.submitDAG(any(DAG.class))).thenThrow(new
SessionNotRunning(""));
+ when(tezSessionPoolSession.submitDAG(any(DAG.class))).thenThrow(new
SessionNotRunning(""));
boolean isException = false;
try {
task.submit(dag, Ref.from(tezSessionPoolSession));
} catch (Exception e) {
isException = true;
- verify(tezClient, times(1)).submitDAG(any(DAG.class));
+ verify(tezSessionPoolSession, times(1)).submitDAG(any(DAG.class));
verify(tezSessionPoolSession, times(2)).reopen();
verify(tezSessionPoolSession, times(0)).destroy();
verify(tezSessionPoolSession, times(1)).returnToSessionManager();
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java
index 8274e87187b..aa25f0b2450 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java
@@ -19,8 +19,10 @@
package org.apache.hadoop.hive.ql.exec.tez;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -28,8 +30,16 @@
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.zookeeper.KeeperException;
import org.junit.Test;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
/**
* Tests for {@link ZookeeperExternalSessionsRegistryClient}.
*/
@@ -128,5 +138,143 @@ public void testReuseSameSession() throws Exception {
}
}
}
+
+ /**
+ * Tests that multiple registry clients (simulating multiple HS2 instances)
+ * respect the global distributed lock (claims) and do not claim the same
session simultaneously.
+ */
+ @Test
+ public void testSessionClaimsFromDifferentRegistryClients() throws Exception
{
+ CuratorFramework client = null;
+ ZookeeperExternalSessionsRegistryClient registry1 = null;
+ ZookeeperExternalSessionsRegistryClient registry2 = null;
+
+ try (TestingServer server = new TestingServer()) {
+ String connectString = server.getConnectString();
+
+ HiveConf conf = new HiveConf();
+ conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+ conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE,
"/tez_ns_concurrent");
+
conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS,
5);
+
+ String namespace = HiveConf.getVar(conf,
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+ String effectivePath =
ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace);
+
+ CuratorFrameworkFactory.Builder builder =
CuratorFrameworkFactory.builder();
+ client = builder.connectString(connectString).retryPolicy(new
RetryOneTime(1)).build();
+ client.start();
+
+ client.create().creatingParentsIfNeeded().forPath(effectivePath +
"/app_1");
+ client.create().forPath(effectivePath + "/app_2");
+
+ registry1 = new ZookeeperExternalSessionsRegistryClient(conf);
+ registry2 = new ZookeeperExternalSessionsRegistryClient(conf);
+
+ String sessionFromRegistry1 = registry1.getSession();
+ String sessionFromRegistry2 = registry2.getSession();
+
+ assertNotNull("Registry 1 should have claimed a session",
sessionFromRegistry1);
+ assertNotNull("Registry 2 should have claimed a session",
sessionFromRegistry2);
+
+ assertNotEquals("The two registries should claim different sessions!",
+ sessionFromRegistry1, sessionFromRegistry2);
+
+ registry1.returnSession(sessionFromRegistry1);
+
+ String session3FromRegistry2 = registry2.getSession();
+ assertEquals("Registry 2 should be able to claim the newly released
session",
+ sessionFromRegistry1, session3FromRegistry2);
+
+ registry2.returnSession(sessionFromRegistry2);
+ registry2.returnSession(session3FromRegistry2);
+ } finally {
+ if (registry1 != null) {
+ registry1.close();
+ }
+ if (registry2 != null) {
+ registry2.close();
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+ /**
+ * Tests that the InterProcessMutex enforces strict Global FIFO ordering.
+ * Clients form a queue when no sessions are available, and are served in
exact order.
+ */
+ @Test
+ public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception {
+ try (TestingServer server = new TestingServer()) {
+ String connectString = server.getConnectString();
+
+ HiveConf conf = new HiveConf();
+ conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+ conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE,
"/tez_ns_fifo");
+
conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS,
5);
+
+ String namespace = HiveConf.getVar(conf,
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+ String effectivePath =
ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace);
+
+ CuratorFrameworkFactory.Builder builder =
CuratorFrameworkFactory.builder();
+ CuratorFramework client =
builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build();
+ client.start();
+
+ ExecutorService executor = Executors.newFixedThreadPool(3);
+ ZookeeperExternalSessionsRegistryClient registry1 = new
ZookeeperExternalSessionsRegistryClient(conf);
+ ZookeeperExternalSessionsRegistryClient registry2 = new
ZookeeperExternalSessionsRegistryClient(conf);
+ ZookeeperExternalSessionsRegistryClient registry3 = new
ZookeeperExternalSessionsRegistryClient(conf);
+ try {
+ // Submit getSession() for one registry at a time and wait for each to
reach globalQueue which is visible
+ // as a sequential lock znode, before starting the next, so FIFO order
matches registry1→2→3.
+ String queuePath = effectivePath + "-queue";
+ Future<String> future1 = executor.submit(registry1::getSession);
+ awaitMutexQueueSize(client, queuePath, 1);
+
+ Future<String> future2 = executor.submit(registry2::getSession);
+ awaitMutexQueueSize(client, queuePath, 2);
+
+ Future<String> future3 = executor.submit(registry3::getSession);
+ awaitMutexQueueSize(client, queuePath, 3);
+
+ client.create().creatingParentsIfNeeded().forPath(effectivePath +
"/app_first");
+ assertEquals("Registry 1 should get the first AM", "app_first",
future1.get(5, TimeUnit.SECONDS));
+
+ client.create().forPath(effectivePath + "/app_second");
+ String session2 = future2.get(5, TimeUnit.SECONDS);
+
+ assertEquals("Registry 2 should get the second AM", "app_second",
session2);
+ registry2.returnSession(session2);
+
+ assertEquals("Registry 3 should get the second AM", "app_second",
future3.get(5, TimeUnit.SECONDS));
+ } finally {
+ registry1.close();
+ registry2.close();
+ registry3.close();
+ client.close();
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ private static void awaitMutexQueueSize(CuratorFramework client, String
queuePath, int expectedSize)
+ throws Exception {
+ long startTimeNs = System.nanoTime();
+ long timeoutNs = TimeUnit.SECONDS.toNanos(30);
+ while (System.nanoTime() - startTimeNs < timeoutNs) {
+ List<String> childQueueNodes;
+ try {
+ childQueueNodes = client.getChildren().forPath(queuePath);
+ } catch (KeeperException.NoNodeException e) {
+ childQueueNodes = Collections.emptyList();
+ }
+ if (childQueueNodes.size() >= expectedSize) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+ fail("Timed out waiting for " + expectedSize + " mutex queue participants
under " + queuePath);
+ }
}