Copilot commented on code in PR #6528:
URL: https://github.com/apache/hive/pull/6528#discussion_r3441297099


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java:
##########
@@ -146,23 +267,38 @@ public void childEvent(final CuratorFramework client, 
final PathChildrenCacheEve
 
       synchronized (lock) {
         switch (event.getType()) {
-          case CHILD_UPDATED, CHILD_ADDED:
-            if (available.contains(applicationId) || 
taken.contains(applicationId)) {
-              return; // We do not expect updates to existing sessions; ignore 
them for now.
-            }
-            available.add(applicationId);
-            break;
-          case CHILD_REMOVED:
-            if (taken.remove(applicationId)) {
-              LOG.warn("The session in use has disappeared from the registry 
({})", applicationId);
-            } else if (!available.remove(applicationId)) {
-              LOG.warn("An unknown session has been removed ({})", 
applicationId);
-            }
-            break;
-          default:
-            // Ignore all the other events; logged above.
+        case CHILD_UPDATED, CHILD_ADDED:
+          if (available.contains(applicationId) || 
taken.contains(applicationId)) {
+            return; // We do not expect updates to existing sessions; ignore 
them for now.
+          }
+          if (claimsCache.get(claimsPath + PATH_SEPARATOR + 
applicationId).isPresent()) {
+            LOG.info("Ignoring newly added AM {} because it is already claimed 
by another session.", applicationId);

Review Comment:
   `claimsCache` is started after `cache.start()`, but 
`ExternalSessionsPathListener` unconditionally dereferences `claimsCache`. 
`CuratorCache.start()` can begin delivering CHILD_ADDED/UPDATED events 
immediately, so this can throw a NullPointerException during initialization 
(window between `cache.start()` and `claimsCache` assignment).



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java:
##########
@@ -59,25 +70,92 @@ public ZookeeperExternalSessionsRegistryClient(final 
HiveConf initConf) {
   }
 
   private static String getApplicationId(final ChildData childData) {
-    return childData.getPath().substring(childData.getPath().lastIndexOf("/") 
+ 1);
+    return 
childData.getPath().substring(childData.getPath().lastIndexOf(PATH_SEPARATOR) + 
1);
   }
 
   private void init() {
     String zkServer = HiveConf.getVar(initConf, 
ConfVars.HIVE_ZOOKEEPER_QUORUM);
+    int sessionTimeoutMs = (int) HiveConf.getTimeVar(initConf, 
ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+        TimeUnit.MILLISECONDS);
+    int connectionTimeoutMs = (int) HiveConf.getTimeVar(initConf, 
ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT,
+        TimeUnit.MILLISECONDS);
+    int baseSleepTimeMs = (int) HiveConf.getTimeVar(initConf, 
ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+        TimeUnit.MILLISECONDS);
+    int maxRetries = HiveConf.getIntVar(initConf, 
ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
     String zkNamespace = HiveConf.getVar(initConf, 
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
     String effectivePath = normalizeZkPath(zkNamespace);
-    CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new 
ExponentialBackoffRetry(1000, 3));
+    this.claimsPath = effectivePath + "-claims";
+    // After connection state changes to SUSPENDED, the client has already 
consumed ~2/3 of the negotiated session
+    // timeout. Use 33% of the remaining window so LOST aligns with when the 
ZK server expires the session and drops
+    // ephemeral claim nodes. For Ref: Curator TN14 
+    this.client = CuratorFrameworkFactory.builder()
+        .connectString(zkServer)
+        .sessionTimeoutMs(sessionTimeoutMs)
+        .connectionTimeoutMs(connectionTimeoutMs)
+        .simulatedSessionExpirationPercent(33)
+        .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries))
+        .build();
+
     synchronized (lock) {
       client.start();
+
+      client.getConnectionStateListenable().addListener((curatorClient, 
newState) -> {
+        if (newState == ConnectionState.CONNECTED || newState == 
ConnectionState.RECONNECTED) {
+          zkConnectionHealthy = true;
+        } else if (newState == ConnectionState.LOST) {
+          zkConnectionHealthy = false;
+          Set<String> sessionsToKill;
+          synchronized (lock) {
+            LOG.error("ZK connection state has changed to lost; killing 
running DAGs on claimed AMs: {}", taken);
+            sessionsToKill = new HashSet<>(taken);
+            taken.clear();
+          }
+          sessionsToKill.forEach(TezJobMonitor::killRunningDAGsForApplication);
+        }
+      });

Review Comment:
   The connection-state listener is registered after `client.start()`. Curator 
only notifies on state *changes*, so the initial CONNECTED event can be missed 
and `zkConnectionHealthy` may remain `false` forever, causing `isClaimed()` to 
return false and preventing DAG submissions for legitimately-claimed external 
sessions.



##########
ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java:
##########
@@ -128,5 +138,141 @@ public void testReuseSameSession() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that multiple registry clients (simulating multiple HS2 instances)
+   * respect the global distributed lock (claims) and do not claim the same 
session simultaneously.
+   */
+  @Test
+  public void testSessionClaimsFromDifferentRegistryClients() throws Exception 
{
+    CuratorFramework client = null;
+    ZookeeperExternalSessionsRegistryClient registry1 = null;
+    ZookeeperExternalSessionsRegistryClient registry2 = null;
+
+    try (TestingServer server = new TestingServer()) {
+      String connectString = server.getConnectString();
+
+      HiveConf conf = new HiveConf();
+      conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, 
"/tez_ns_concurrent");
+

Review Comment:
   This new test doesn't set 
`HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS`, whose default is 60s 
(HiveConf). If something goes wrong the test can block for up to a minute (no 
timeouts around `getSession()` here), which is painful for CI/debugging.



##########
ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java:
##########
@@ -128,5 +138,141 @@ public void testReuseSameSession() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that multiple registry clients (simulating multiple HS2 instances)
+   * respect the global distributed lock (claims) and do not claim the same 
session simultaneously.
+   */
+  @Test
+  public void testSessionClaimsFromDifferentRegistryClients() throws Exception 
{
+    CuratorFramework client = null;
+    ZookeeperExternalSessionsRegistryClient registry1 = null;
+    ZookeeperExternalSessionsRegistryClient registry2 = null;
+
+    try (TestingServer server = new TestingServer()) {
+      String connectString = server.getConnectString();
+
+      HiveConf conf = new HiveConf();
+      conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, 
"/tez_ns_concurrent");
+
+      String namespace = HiveConf.getVar(conf, 
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+      String effectivePath = 
ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace);
+
+      CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+      client = builder.connectString(connectString).retryPolicy(new 
RetryOneTime(1)).build();
+      client.start();
+
+      client.create().creatingParentsIfNeeded().forPath(effectivePath + 
"/app_1");
+      client.create().forPath(effectivePath + "/app_2");
+
+      registry1 = new ZookeeperExternalSessionsRegistryClient(conf);
+      registry2 = new ZookeeperExternalSessionsRegistryClient(conf);
+
+      String sessionFromRegistry1 = registry1.getSession();
+      String sessionFromRegistry2 = registry2.getSession();
+
+      assertNotNull("Registry 1 should have claimed a session", 
sessionFromRegistry1);
+      assertNotNull("Registry 2 should have claimed a session", 
sessionFromRegistry2);
+
+      assertNotEquals("The two registries should claim different sessions!",
+          sessionFromRegistry1, sessionFromRegistry2);
+
+      registry1.returnSession(sessionFromRegistry1);
+
+      String session3FromRegistry2 = registry2.getSession();
+      assertEquals("Registry 2 should be able to claim the newly released 
session",
+          sessionFromRegistry1, session3FromRegistry2);
+
+      registry2.returnSession(sessionFromRegistry2);
+      registry2.returnSession(session3FromRegistry2);
+    } finally {
+      if (registry1 != null) {
+        registry1.close();
+      }
+      if (registry2 != null) {
+        registry2.close();
+      }
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+
+  /**
+   * Tests that the InterProcessMutex enforces strict Global FIFO ordering.
+   * Clients form a queue when no sessions are available, and are served in 
exact order.
+   */
+  @Test
+  public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception {
+    try (TestingServer server = new TestingServer()) {
+      String connectString = server.getConnectString();
+
+      HiveConf conf = new HiveConf();
+      conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, 
"/tez_ns_fifo");
+

Review Comment:
   This test also relies on `getSession()` blocking behavior but doesn't 
override `HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS` (default 60s). 
Setting a small value here helps ensure failures surface quickly and avoids 
long-running hung test threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to