This is an automated email from the ASF dual-hosted git repository.

ayushtkn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ccc70354fd4 HIVE-29651: Update ZookeeperExternalSessionsRegistryClient 
to handle multiple HiveServer2 instances submitting DAGs concurrently to 
available Tez External Sessions (#6528)
ccc70354fd4 is described below

commit ccc70354fd4c4b89ce209c1b4d4a2fff96698388
Author: Tanishq Chugh <[email protected]>
AuthorDate: Mon Jun 22 16:32:54 2026 +0530

    HIVE-29651: Update ZookeeperExternalSessionsRegistryClient to handle 
multiple HiveServer2 instances submitting DAGs concurrently to available Tez 
External Sessions (#6528)
---
 .../hive/ql/exec/tez/ExternalSessionsRegistry.java |   7 +
 .../hive/ql/exec/tez/TezExternalSessionState.java  |  83 +++++++++
 .../apache/hadoop/hive/ql/exec/tez/TezSession.java |   3 +
 .../hive/ql/exec/tez/TezSessionPoolManager.java    |  14 ++
 .../hive/ql/exec/tez/TezSessionPoolSession.java    |   7 +
 .../hadoop/hive/ql/exec/tez/TezSessionState.java   |   7 +
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java    |   7 +-
 .../ZookeeperExternalSessionsRegistryClient.java   | 203 +++++++++++++++++----
 .../hive/ql/exec/tez/monitoring/TezJobMonitor.java |  14 ++
 .../hadoop/hive/ql/exec/tez/TestTezTask.java       |  12 +-
 ...estZookeeperExternalSessionsRegistryClient.java | 148 +++++++++++++++
 11 files changed, 461 insertions(+), 44 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java
index 7f279c3648d..8f822c2585c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java
@@ -40,4 +40,11 @@ public interface ExternalSessionsRegistry {
    * Closes the external session registry.
    */
   void close();
+
+  /**
+   * Returns true if this registry instance currently holds a claim on the 
given AM.
+   */
+  default boolean isClaimed(String appId) {
+    return true; // Non-ZK registries case is always true
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java
index bd49d02e530..bb4cc106d5b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java
@@ -19,17 +19,26 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
+import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 
 /**
@@ -139,6 +148,11 @@ public void close(boolean keepDagFilesDir) throws 
Exception {
     // We never close external sessions that don't have errors.
     try {
       if (externalAppId != null) {
+        LOG.debug("Returning external session with appID: {}", externalAppId);
+        SessionState sessionState = SessionState.get();
+        if (sessionState != null) {
+          sessionState.setTezSession(null);
+        }
         registry.returnSession(externalAppId);
       }
     } catch (Exception e) {
@@ -187,4 +201,73 @@ public boolean killQuery(String reason) throws 
HiveException {
     killQuery.killQuery(queryId, reason, conf, false);
     return true;
   }
+
+  @Override
+  public DAGClient submitDAG(DAG dag) throws TezException, IOException {
+    if (!registry.isClaimed(externalAppId)) {
+      throw new TezException("Cannot submit DAG as the Tez Session no-longer 
owns the AM: " + externalAppId);
+    }
+    try {
+      return getTezClient().submitDAG(dag);
+    } catch (TezException e) {
+      if (e.getMessage() == null || !e.getMessage().contains("App master 
already running a DAG")) {
+        throw e;
+      }
+      tryKillRunningDAGs(getTezClient());
+      return getTezClient().submitDAG(dag);
+    }
+  }
+
+  private void tryKillRunningDAGs(TezClient session) throws TezException {
+    if (!registry.isClaimed(externalAppId)) {
+      throw new TezException("Cannot kill running DAG as the Tez Session 
no-longer owns the AM: " + externalAppId);
+    }
+    LOG.info("External session has an AM which is already running a DAG on app 
ID {}", externalAppId);
+    DAGClientAMProtocolBlockingPB proxy = session.sendAMHeartbeat(null);
+    if (proxy == null) {
+      throw new TezException("Error while trying to connect to AM for app ID " 
+ externalAppId);
+    }
+    long killTimeoutMs = TimeUnit.SECONDS.toMillis(
+        HiveConf.getIntVar(conf, 
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS));
+    try {
+      DAGClientAMProtocolRPC.GetAllDAGsResponseProto allDAGSResponse =
+          proxy.getAllDAGs(null, 
DAGClientAMProtocolRPC.GetAllDAGsRequestProto.newBuilder().build());
+      for (String dagId : allDAGSResponse.getDagIdList()) {
+        LOG.info("External session: attempting to kill dagId {} on app ID {}", 
dagId, externalAppId);
+        proxy.tryKillDAG(null, 
DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(dagId).build());
+        waitForDagTerminal(proxy, dagId, killTimeoutMs);
+      }
+    } catch (Exception e) {
+      throw new TezException("Error while trying to kill existing DAG running 
on app ID " + externalAppId, e);
+    }
+  }
+
+  private void waitForDagTerminal(DAGClientAMProtocolBlockingPB proxy, String 
dagId, long timeoutMs)
+      throws TezException, ServiceException {
+    long startTimeMs = System.currentTimeMillis();
+    long pollIntervalMs = 
conf.getTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+    while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
+      long remainingMs = timeoutMs - (System.currentTimeMillis() - 
startTimeMs);
+      DAGClientAMProtocolRPC.GetDAGStatusResponseProto response = 
proxy.getDAGStatus(null,
+          DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder()
+              .setDagId(dagId)
+              .setTimeout(Math.min(pollIntervalMs, remainingMs))
+              .build());
+      if (response.hasDagStatus() && response.getDagStatus().hasState()
+          && isTerminalDagState(response.getDagStatus().getState())) {
+        LOG.info("External session: dagId {} on app ID {} reached terminal 
state {}", dagId, externalAppId,
+            response.getDagStatus().getState());
+        return;
+      }
+    }
+    throw new TezException("Timed out after " + timeoutMs + " ms waiting for 
orphan DAG " + dagId
+        + " on app ID " + externalAppId + " to reach terminal state after 
kill");
+  }
+
+  private static boolean isTerminalDagState(DAGProtos.DAGStatusStateProto 
state) {
+    return switch (state) {
+    case DAG_SUCCEEDED, DAG_KILLED, DAG_FAILED, DAG_ERROR -> true;
+    default -> false;
+    };
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java
index 68844bd8172..2f64ec41a58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java
@@ -34,7 +34,9 @@
 import org.apache.hadoop.hive.ql.wm.WmContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 
 /**
@@ -86,6 +88,7 @@ public String toString() {
 
   HiveConf getConf();
   TezClient getTezClient();
+  DAGClient submitDAG(DAG dag) throws TezException, IOException;
   boolean isOpen();
   boolean isOpening();
   boolean getDoAsEnabled();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 605a92ebc8f..9b9fbfebd9f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -353,6 +353,20 @@ void returnSession(TezSession tezSessionState) {
             + " belongs to the pool. Put it back in");
         
defaultSessionPool.returnSession((TezSessionPoolSession)tezSessionState);
       }
+
+      if (useExternalSessions && !tezSessionState.isDefault()) {
+        if (tezSessionState.getTezClient() != null
+            && tezSessionState.getTezClient().getAppMasterApplicationId() != 
null) {
+          try {
+            tezSessionState.close(false);
+          } catch (Exception ex) {
+            LOG.warn("Failed to return external Tez session {}", 
tezSessionState.getSessionId(), ex);
+          }
+        } else {
+          LOG.warn("Not returning session '{}' as tez client or app id is 
null", tezSessionState.getSessionId());
+        }
+      }
+
       // non default session nothing changes. The user can continue to use the 
existing
       // session in the SessionState
     } finally {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 415072f221d..a473b32e887 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -34,7 +34,9 @@
 import org.apache.hadoop.hive.registry.impl.TezAmInstance;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -337,6 +339,11 @@ public TezClient getTezClient() {
     return baseSession.getTezClient();
   }
 
+  @Override
+  public DAGClient submitDAG(DAG dag) throws TezException, IOException {
+    return baseSession.submitDAG(dag);
+  }
+
   @Override
   public boolean isOpening() {
     return baseSession.isOpening();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 2924416ad48..b50c15cf9b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -76,11 +76,13 @@
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.Progress;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
@@ -820,6 +822,11 @@ public TezClient getTezClient() {
     return session;
   }
 
+  @Override
+  public DAGClient submitDAG(DAG dag) throws TezException, IOException {
+    return getTezClient().submitDAG(dag);
+  }
+
   @Override
   public LocalResource getAppJarLr() {
     return appJarLr;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 6bd801dd64c..8e3b8140752 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -472,9 +472,8 @@ void ensureSessionHasResources(
       TezSession session, String[] nonConfResources) throws Exception {
     TezClient client = session.getTezClient();
     // TODO null can also mean that this operation was interrupted. Should we 
really try to re-create the session in that case ?
-    if (client == null) {
-      // Note: the only sane case where this can happen is the non-pool one. 
We should get rid
-      //       of it, in non-pool case perf doesn't matter so we might as well 
open at get time
+    if (client == null || !session.isOpen()) {
+      // Note: We should get rid of it, in non-pool case perf doesn't matter 
so we might as well open at get time
       //       and then call update like we do in the else.
       // Can happen if the user sets the tez flag after the session was 
established.
       LOG.info("Tez session hasn't been created yet. Opening session");
@@ -692,7 +691,7 @@ DAGClient submit(DAG dag, Ref<TezSession> sessionStateRef) 
throws Exception {
 
   private DAGClient submitInternal(DAG dag, TezSession sessionState) throws 
TezException, IOException {
     runtimeContext.init(sessionState);
-    return sessionState.getTezClient().submitDAG(dag);
+    return sessionState.submitDAG(dag);
   }
 
   private void sessionDestroyOrReturnToPool(Ref<TezSession> sessionStateRef,
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java
index 550c77e573a..08df3bc922a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java
@@ -21,6 +21,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.curator.framework.CuratorFramework;
@@ -31,9 +32,14 @@
 import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,15 +48,20 @@
 // TODO: tez should provide this registry
 public class ZookeeperExternalSessionsRegistryClient implements 
ExternalSessionsRegistry {
   private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class);
+  private static final String PATH_SEPARATOR = "/";
 
   private final HiveConf initConf;
   private final Set<String> available = new HashSet<>();
   private final Set<String> taken = new HashSet<>();
   private final Object lock = new Object();
   private final int maxAttempts;
-
+  private CuratorFramework client;
   private CuratorCache cache;
+  private CuratorCache claimsCache;
+  private InterProcessMutex globalQueue;
+  private String claimsPath;
   private volatile boolean isInitialized;
+  private volatile boolean zkConnectionHealthy = false;
 
 
   public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) {
@@ -59,25 +70,91 @@ public ZookeeperExternalSessionsRegistryClient(final 
HiveConf initConf) {
   }
 
   private static String getApplicationId(final ChildData childData) {
-    return childData.getPath().substring(childData.getPath().lastIndexOf("/") 
+ 1);
+    return 
childData.getPath().substring(childData.getPath().lastIndexOf(PATH_SEPARATOR) + 
1);
   }
 
   private void init() {
     String zkServer = HiveConf.getVar(initConf, 
ConfVars.HIVE_ZOOKEEPER_QUORUM);
+    int sessionTimeoutMs = (int) HiveConf.getTimeVar(initConf, 
ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+        TimeUnit.MILLISECONDS);
+    int connectionTimeoutMs = (int) HiveConf.getTimeVar(initConf, 
ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT,
+        TimeUnit.MILLISECONDS);
+    int baseSleepTimeMs = (int) HiveConf.getTimeVar(initConf, 
ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+        TimeUnit.MILLISECONDS);
+    int maxRetries = HiveConf.getIntVar(initConf, 
ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
     String zkNamespace = HiveConf.getVar(initConf, 
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
     String effectivePath = normalizeZkPath(zkNamespace);
-    CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new 
ExponentialBackoffRetry(1000, 3));
+    this.claimsPath = effectivePath + "-claims";
+    // After connection state changes to SUSPENDED, the client has already 
consumed ~2/3 of the negotiated session
+    // timeout. Use 33% of the remaining window so LOST aligns with when the 
ZK server expires the session and drops
+    // ephemeral claim nodes. For Ref: Curator TN14 
+    this.client = CuratorFrameworkFactory.builder()
+        .connectString(zkServer)
+        .sessionTimeoutMs(sessionTimeoutMs)
+        .connectionTimeoutMs(connectionTimeoutMs)
+        .simulatedSessionExpirationPercent(33)
+        .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries))
+        .build();
+
     synchronized (lock) {
+      client.getConnectionStateListenable().addListener((curatorClient, 
newState) -> {
+        if (newState == ConnectionState.CONNECTED || newState == 
ConnectionState.RECONNECTED) {
+          zkConnectionHealthy = true;
+        } else if (newState == ConnectionState.LOST) {
+          zkConnectionHealthy = false;
+          Set<String> sessionsToKill;
+          synchronized (lock) {
+            LOG.error("ZK connection state has changed to lost; killing 
running DAGs on claimed AMs: {}", taken);
+            sessionsToKill = new HashSet<>(taken);
+            taken.clear();
+          }
+          sessionsToKill.forEach(TezJobMonitor::killRunningDAGsForApplication);
+        }
+      });
       client.start();
+
+      this.globalQueue = new InterProcessMutex(client, effectivePath + 
"-queue");
       this.cache = CuratorCache.build(client, effectivePath);
       CuratorCacheListener listener = CuratorCacheListener.builder()
           .forPathChildrenCache(effectivePath, client, new 
ExternalSessionsPathListener())
           .build();
       cache.listenable().addListener(listener);
       cache.start();
+
+      this.claimsCache = CuratorCache.build(client, claimsPath);
+      CuratorCacheListener claimsListener = 
CuratorCacheListener.builder().forCreates(
+          childData -> {
+          if (childData == null) {
+            return;
+          }
+          String applicationId = getApplicationId(childData);
+          synchronized (lock) {
+            available.remove(applicationId);
+          }
+        }).forDeletes(
+          childData -> {
+            if (childData == null) {
+              return;
+            }
+            String applicationId = getApplicationId(childData);
+            synchronized (lock) {
+              if (!taken.contains(applicationId)) {
+                if (cache.get(effectivePath + PATH_SEPARATOR + 
applicationId).isPresent()) {  
+                  available.add(applicationId);
+                  lock.notifyAll();
+                } else {
+                  LOG.info("Ignoring AM claim removal for {} because the base 
AM node no longer exists.",
+                      applicationId);
+                }
+              }
+            }
+          }).build();
+      claimsCache.listenable().addListener(claimsListener);
+      claimsCache.start();
+
       cache.stream()
           .filter(childData -> childData.getPath() != null
-              && childData.getPath().startsWith(effectivePath + "/"))
+              && childData.getPath().startsWith(effectivePath + 
PATH_SEPARATOR))
           .forEach(childData -> available.add(getApplicationId(childData)));
       LOG.info("Initial external sessions: {}", available);
       isInitialized = true;
@@ -86,27 +163,54 @@ private void init() {
 
   @VisibleForTesting
   static String normalizeZkPath(String zkNamespace) {
-    return (zkNamespace.startsWith("/") ? zkNamespace : "/" + zkNamespace);
+    return (zkNamespace.startsWith(PATH_SEPARATOR) ? zkNamespace : 
PATH_SEPARATOR + zkNamespace);
   }
 
   @Override
   public String getSession() throws Exception {
-    synchronized (lock) {
-      if (!isInitialized) {
-        init();
-      }
-      long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts);
-      while (available.isEmpty() && ((endTimeNs - System.nanoTime()) > 0)) {
-        lock.wait(1000L);
+    if (!isInitialized) {
+      synchronized (lock) {
+        if (!isInitialized) {
+          init();
+        }
       }
-      Iterator<String> iter = available.iterator();
-      if (!iter.hasNext()) {
-        throw new IOException("Cannot get a session after " + maxAttempts + " 
attempts");
+    }
+    
+    long startTimeNs = System.nanoTime();
+    long timeoutNs = TimeUnit.SECONDS.toNanos(maxAttempts);
+    long queueWaitTimeMs = Math.max(0, (timeoutNs - (System.nanoTime() - 
startTimeNs)) / 1000000L);
+    if (!globalQueue.acquire(queueWaitTimeMs, TimeUnit.MILLISECONDS)) {
+      throw new IOException("Cannot get a session (timed out in queue) after " 
+ maxAttempts + " seconds");
+    }
+    try {
+      synchronized (lock) {
+        while (System.nanoTime() - startTimeNs < timeoutNs) {
+          Iterator<String> iter = available.iterator();
+
+          while (iter.hasNext()) {
+            String appId = iter.next();
+            try {
+              String claimNodePath = claimsPath + PATH_SEPARATOR + appId;
+              
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(claimNodePath);
+              iter.remove();
+              taken.add(appId);
+              return appId;
+            } catch (KeeperException.NodeExistsException e) {
+              iter.remove();
+            }
+          }
+          long remainingTimeNs = timeoutNs - (System.nanoTime() - startTimeNs);
+          if (remainingTimeNs > 0) {
+            // Add one to remainingTime milliseconds computation to prevent 
the case where
+            // (remainingTimeNs / 1000000L) can return 0 causing the lock to 
be held indefinitely.
+            long waitTimeMs = Math.min(1000L, (remainingTimeNs / 1000000L) + 
1);
+            lock.wait(waitTimeMs);
+          }
+        }
+        throw new IOException("Cannot get a session after waiting for " + 
maxAttempts + " seconds (timeout exhausted)");
       }
-      String appId = iter.next();
-      iter.remove();
-      taken.add(appId);
-      return appId;
+    } finally {
+      globalQueue.release();
     }
   }
 
@@ -119,6 +223,16 @@ public void returnSession(String appId) {
       if (!taken.remove(appId)) {
         return; // Session has been removed from ZK.
       }
+
+      try {
+        client.delete().guaranteed().forPath(claimsPath + PATH_SEPARATOR + 
appId);
+      } catch (KeeperException.NoNodeException e) {
+        // If the claim Node has already been deleted, we can ignore it.
+        LOG.debug("Claim Node has already been deleted for the session {}", 
appId, e);
+      } catch (Exception e) {
+        LOG.warn("Failed to delete claim node for session {}", appId, e);
+      }
+
       available.add(appId);
       lock.notifyAll();
     }
@@ -126,9 +240,15 @@ public void returnSession(String appId) {
 
   @Override
   public void close() {
+    if (claimsCache != null) {
+      claimsCache.close();
+    }
     if (cache != null) {
       cache.close();
     }
+    if (client != null) {
+      client.close();
+    }
   }
 
   private final class ExternalSessionsPathListener implements 
PathChildrenCacheListener {
@@ -146,23 +266,38 @@ public void childEvent(final CuratorFramework client, 
final PathChildrenCacheEve
 
       synchronized (lock) {
         switch (event.getType()) {
-          case CHILD_UPDATED, CHILD_ADDED:
-            if (available.contains(applicationId) || 
taken.contains(applicationId)) {
-              return; // We do not expect updates to existing sessions; ignore 
them for now.
-            }
-            available.add(applicationId);
-            break;
-          case CHILD_REMOVED:
-            if (taken.remove(applicationId)) {
-              LOG.warn("The session in use has disappeared from the registry 
({})", applicationId);
-            } else if (!available.remove(applicationId)) {
-              LOG.warn("An unknown session has been removed ({})", 
applicationId);
-            }
-            break;
-          default:
-            // Ignore all the other events; logged above.
+        case CHILD_UPDATED, CHILD_ADDED:
+          if (available.contains(applicationId) || 
taken.contains(applicationId)) {
+            return; // We do not expect updates to existing sessions; ignore 
them for now.
+          }
+          if (claimsCache != null && claimsCache.get(claimsPath + 
PATH_SEPARATOR + applicationId).isPresent()) {
+            LOG.info("Ignoring newly added AM {} because it is already claimed 
by another session.", applicationId);
+            return;
+          }
+          available.add(applicationId);
+          lock.notifyAll();
+          break;
+        case CHILD_REMOVED:
+          if (taken.remove(applicationId)) {
+            LOG.warn("The session in use has disappeared from the registry 
({})", applicationId);
+          } else if (!available.remove(applicationId)) {
+            LOG.warn("An unknown session has been removed ({})", 
applicationId);
+          }
+          break;
+        default:
+          // Ignore all the other events; logged above.
         }
       }
     }
   }
+
+  @Override
+  public boolean isClaimed(String appId) {
+    if (!zkConnectionHealthy) {
+      return false;
+    }
+    synchronized (lock) {
+      return taken.contains(appId);
+    }
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index 92844f4d571..871258837af 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -499,6 +499,20 @@ public static void killRunningJobs() {
     }
   }
 
+  public static void killRunningDAGsForApplication(String applicationId) {
+    synchronized (shutdownList) {
+      for (DAGClient c : shutdownList) {
+        try {
+          if (applicationId.equals(c.getSessionIdentifierString())) {
+            c.tryKillDAG();
+          }
+        } catch (Exception e) {
+          LOG.error("Error while trying to kill running DAG on tez session 
{}", applicationId, e);
+        }
+      }
+    }
+  }
+
   static long getCounterValueByGroupName(TezCounters vertexCounters, String 
groupNamePattern,
                                          String counterName) {
     TezCounter tezCounter = 
vertexCounters.getGroup(groupNamePattern).findCounter(counterName);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index fa71845ece2..34aa466084a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -180,7 +180,7 @@ public Edge answer(InvocationOnMock invocation) throws 
Throwable {
     sessionState = mock(TezSessionState.class);
     when(sessionState.getTezClient()).thenReturn(session);
     when(sessionState.reopen()).thenReturn(sessionState);
-    when(session.submitDAG(any(DAG.class)))
+    when(sessionState.submitDAG(any(DAG.class)))
       .thenThrow(new SessionNotRunning(""))
       .thenReturn(mock(DAGClient.class));
   }
@@ -229,7 +229,7 @@ public void testSubmit() throws Exception {
     task.submit(dag, Ref.from(sessionState));
     // validate close/reopen
     verify(sessionState, times(1)).reopen();
-    verify(session, times(2)).submitDAG(any(DAG.class));
+    verify(sessionState, times(2)).submitDAG(any(DAG.class));
   }
 
   @Test
@@ -241,14 +241,14 @@ public void testSubmitOnNonPoolSession() throws Exception 
{
     TezClient tezClient = mock(TezClient.class);
     when(tezSessionState.reopen()).thenThrow(new HiveException("Dag cannot be 
submitted"));
     when(tezSessionState.getTezClient()).thenReturn(tezClient);
-    when(tezClient.submitDAG(any(DAG.class))).thenThrow(new 
SessionNotRunning(""));
+    when(tezSessionState.submitDAG(any(DAG.class))).thenThrow(new 
SessionNotRunning(""));
     doNothing().when(tezSessionState).destroy();
     boolean isException = false;
     try {
       task.submit(dag, Ref.from(tezSessionState));
     } catch (Exception e) {
       isException = true;
-      verify(tezClient, times(1)).submitDAG(any(DAG.class));
+      verify(tezSessionState, times(1)).submitDAG(any(DAG.class));
       verify(tezSessionState, times(2)).reopen();
       verify(tezSessionState, times(1)).destroy();
       verify(tezSessionState, times(0)).returnToSessionManager();
@@ -266,13 +266,13 @@ public void testSubmitOnPoolSession() throws Exception {
     doNothing().when(tezSessionPoolSession).returnToSessionManager();
     when(tezSessionPoolSession.getTezClient()).thenReturn(tezClient);
     when(tezSessionPoolSession.isDefault()).thenReturn(true);
-    when(tezClient.submitDAG(any(DAG.class))).thenThrow(new 
SessionNotRunning(""));
+    when(tezSessionPoolSession.submitDAG(any(DAG.class))).thenThrow(new 
SessionNotRunning(""));
     boolean isException = false;
     try {
       task.submit(dag, Ref.from(tezSessionPoolSession));
     } catch (Exception e) {
       isException = true;
-      verify(tezClient, times(1)).submitDAG(any(DAG.class));
+      verify(tezSessionPoolSession, times(1)).submitDAG(any(DAG.class));
       verify(tezSessionPoolSession, times(2)).reopen();
       verify(tezSessionPoolSession, times(0)).destroy();
       verify(tezSessionPoolSession, times(1)).returnToSessionManager();
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java
index 8274e87187b..aa25f0b2450 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -28,8 +30,16 @@
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Tests for {@link ZookeeperExternalSessionsRegistryClient}.
  */
@@ -128,5 +138,143 @@ public void testReuseSameSession() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that multiple registry clients (simulating multiple HS2 instances)
+   * respect the global distributed lock (claims) and do not claim the same 
session simultaneously.
+   */
+  @Test
+  public void testSessionClaimsFromDifferentRegistryClients() throws Exception 
{
+    CuratorFramework client = null;
+    ZookeeperExternalSessionsRegistryClient registry1 = null;
+    ZookeeperExternalSessionsRegistryClient registry2 = null;
+
+    try (TestingServer server = new TestingServer()) {
+      String connectString = server.getConnectString();
+
+      HiveConf conf = new HiveConf();
+      conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, 
"/tez_ns_concurrent");
+      
conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 
5);
+
+      String namespace = HiveConf.getVar(conf, 
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+      String effectivePath = 
ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace);
+
+      CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+      client = builder.connectString(connectString).retryPolicy(new 
RetryOneTime(1)).build();
+      client.start();
+
+      client.create().creatingParentsIfNeeded().forPath(effectivePath + 
"/app_1");
+      client.create().forPath(effectivePath + "/app_2");
+
+      registry1 = new ZookeeperExternalSessionsRegistryClient(conf);
+      registry2 = new ZookeeperExternalSessionsRegistryClient(conf);
+
+      String sessionFromRegistry1 = registry1.getSession();
+      String sessionFromRegistry2 = registry2.getSession();
+
+      assertNotNull("Registry 1 should have claimed a session", 
sessionFromRegistry1);
+      assertNotNull("Registry 2 should have claimed a session", 
sessionFromRegistry2);
+
+      assertNotEquals("The two registries should claim different sessions!",
+          sessionFromRegistry1, sessionFromRegistry2);
+
+      registry1.returnSession(sessionFromRegistry1);
+
+      String session3FromRegistry2 = registry2.getSession();
+      assertEquals("Registry 2 should be able to claim the newly released 
session",
+          sessionFromRegistry1, session3FromRegistry2);
+
+      registry2.returnSession(sessionFromRegistry2);
+      registry2.returnSession(session3FromRegistry2);
+    } finally {
+      if (registry1 != null) {
+        registry1.close();
+      }
+      if (registry2 != null) {
+        registry2.close();
+      }
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+
+  /**
+   * Tests that the InterProcessMutex enforces strict Global FIFO ordering.
+   * Clients form a queue when no sessions are available, and are served in 
exact order.
+   */
+  @Test
+  public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception {
+    try (TestingServer server = new TestingServer()) {
+      String connectString = server.getConnectString();
+
+      HiveConf conf = new HiveConf();
+      conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, 
"/tez_ns_fifo");
+      
conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 
5);
+
+      String namespace = HiveConf.getVar(conf, 
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+      String effectivePath = 
ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace);
+
+      CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+      CuratorFramework client = 
builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build();
+      client.start();
+
+      ExecutorService executor = Executors.newFixedThreadPool(3);
+      ZookeeperExternalSessionsRegistryClient registry1 = new 
ZookeeperExternalSessionsRegistryClient(conf);
+      ZookeeperExternalSessionsRegistryClient registry2 = new 
ZookeeperExternalSessionsRegistryClient(conf);
+      ZookeeperExternalSessionsRegistryClient registry3 = new 
ZookeeperExternalSessionsRegistryClient(conf);
+      try {
+        // Submit getSession() for one registry at a time and wait for each to 
reach globalQueue which is visible
+        // as a sequential lock znode, before starting the next, so FIFO order 
matches registry1→2→3.
+        String queuePath = effectivePath + "-queue";
+        Future<String> future1 = executor.submit(registry1::getSession);
+        awaitMutexQueueSize(client, queuePath, 1);
+
+        Future<String> future2 = executor.submit(registry2::getSession);
+        awaitMutexQueueSize(client, queuePath, 2);
+
+        Future<String> future3 = executor.submit(registry3::getSession);
+        awaitMutexQueueSize(client, queuePath, 3);
+
+        client.create().creatingParentsIfNeeded().forPath(effectivePath + 
"/app_first");
+        assertEquals("Registry 1 should get the first AM", "app_first", 
future1.get(5, TimeUnit.SECONDS));
+
+        client.create().forPath(effectivePath + "/app_second");
+        String session2 = future2.get(5, TimeUnit.SECONDS);
+
+        assertEquals("Registry 2 should get the second AM", "app_second", 
session2);
+        registry2.returnSession(session2);
+
+        assertEquals("Registry 3 should get the second AM", "app_second", 
future3.get(5, TimeUnit.SECONDS));
+      } finally {
+        registry1.close();
+        registry2.close();
+        registry3.close();
+        client.close();
+        executor.shutdownNow();
+      }
+    }
+  }
+
+  private static void awaitMutexQueueSize(CuratorFramework client, String 
queuePath, int expectedSize)
+      throws Exception {
+    long startTimeNs = System.nanoTime();
+    long timeoutNs = TimeUnit.SECONDS.toNanos(30);
+    while (System.nanoTime() - startTimeNs < timeoutNs) {
+      List<String> childQueueNodes;
+      try {
+        childQueueNodes = client.getChildren().forPath(queuePath);
+      } catch (KeeperException.NoNodeException e) {
+        childQueueNodes = Collections.emptyList();
+      }
+      if (childQueueNodes.size() >= expectedSize) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+    fail("Timed out waiting for " + expectedSize + " mutex queue participants 
under " + queuePath);
+  }
 }
 


Reply via email to