This is an automated email from the ASF dual-hosted git repository.

wombatu-kun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ccf1b3c31b [MINOR] Wait for ZK connection in lock provider to 
de-flake direct-marker detection test (#19014)
f3ccf1b3c31b is described below

commit f3ccf1b3c31b52e7ece2d596a7749369e2f7e0b0
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Jun 16 15:20:11 2026 +0700

    [MINOR] Wait for ZK connection in lock provider to de-flake direct-marker 
detection test (#19014)
---
 .../lock/BaseZookeeperBasedLockProvider.java       | 27 ++++++++++++++++++--
 .../TestZookeeperBasedLockProvider.java            | 21 ++++++++++++++++
 ...erBasedDetectionStrategyWithZKLockProvider.java | 29 +++++++++++++++++++---
 3 files changed, 71 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
index d5b04c15c005..6cdee60e2d1b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
@@ -68,6 +68,7 @@ public abstract class BaseZookeeperBasedLockProvider 
implements LockProvider<Int
     this.lockConfiguration = lockConfiguration;
     zkBasePath = getZkBasePath(lockConfiguration);
     lockKey = getLockKey(lockConfiguration);
+    int connectionTimeoutMs = 
ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(), 
ZK_CONNECTION_TIMEOUT_MS);
     this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
         
.connectString(ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(), 
ZK_CONNECT_URL))
         .retryPolicy(new BoundedExponentialBackoffRetry(
@@ -75,10 +76,32 @@ public abstract class BaseZookeeperBasedLockProvider 
implements LockProvider<Int
             ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(), 
LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS),
             ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(), 
LOCK_ACQUIRE_NUM_RETRIES)))
         
.sessionTimeoutMs(ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(), 
ZK_SESSION_TIMEOUT_MS))
-        
.connectionTimeoutMs(ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
 ZK_CONNECTION_TIMEOUT_MS))
+        .connectionTimeoutMs(connectionTimeoutMs)
         .build();
     this.curatorFrameworkClient.start();
-    createPathIfNotExists();
+    // Once started, the Curator client owns background threads. If anything 
below throws, the
+    // constructor never returns the instance, so the caller can never invoke 
close() - clean up here.
+    try {
+      if 
(!this.curatorFrameworkClient.blockUntilConnected(connectionTimeoutMs, 
TimeUnit.MILLISECONDS)) {
+        throw new HoodieLockException("Failed to connect to ZooKeeper within " 
+ connectionTimeoutMs + " ms");
+      }
+      createPathIfNotExists();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      closeQuietly();
+      throw new HoodieLockException("Interrupted while waiting to connect to 
ZooKeeper", e);
+    } catch (RuntimeException e) {
+      closeQuietly();
+      throw e;
+    }
+  }
+
+  private void closeQuietly() {
+    try {
+      this.curatorFrameworkClient.close();
+    } catch (Exception ex) {
+      log.warn("Failed to close ZooKeeper client after failed initialization", 
ex);
+    }
   }
 
   protected abstract String getZkBasePath(LockConfiguration lockConfiguration);
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
index fab3dee8f8f6..fa1a8329b65f 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -194,4 +195,24 @@ public class TestZookeeperBasedLockProvider {
     ZookeeperBasedLockProvider zookeeperBasedLockProvider = new 
ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
     zookeeperBasedLockProvider.unlock();
   }
+
+  @Test
+  public void testFailFastWhenZkUnreachable() {
+    Properties properties = new Properties();
+    // Nothing listens on 127.0.0.1:1, so the connect-wait must time out 
instead of hanging.
+    properties.setProperty(ZK_CONNECT_URL_PROP_KEY, "127.0.0.1:1");
+    properties.setProperty(ZK_BASE_PATH_PROP_KEY, basePath);
+    properties.setProperty(ZK_LOCK_KEY_PROP_KEY, key);
+    properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, 
"100");
+    
properties.setProperty(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY, 
"300");
+    properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "1");
+    properties.setProperty(ZK_SESSION_TIMEOUT_MS_PROP_KEY, "1000");
+    properties.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, "1000");
+    LockConfiguration unreachable = new LockConfiguration(properties);
+    // Construction must fail fast (seconds, bounded by the connection 
timeout) with a
+    // HoodieLockException instead of being amplified into a multi-minute 
retry hang.
+    Assertions.assertTimeoutPreemptively(Duration.ofSeconds(15), () ->
+        Assertions.assertThrows(HoodieLockException.class,
+            () -> new ZookeeperBasedLockProvider(unreachable, null)));
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockProvider.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockProvider.java
index 179504b9e161..73d9c40db316 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockProvider.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockProvider.java
@@ -51,9 +51,16 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -82,6 +89,15 @@ public class 
TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockPr
     properties.setProperty(ZK_CONNECT_URL_PROP_KEY, server.getConnectString());
     properties.setProperty(ZK_BASE_PATH_PROP_KEY, 
server.getTempDirectory().getAbsolutePath());
     properties.setProperty(ZK_LOCK_KEY_PROP_KEY, "key");
+    // Bound lock retries and ZK timeouts so a transient connection failure 
fails fast in seconds
+    // instead of being amplified by the production-default retry layers into 
a multi-minute hang.
+    properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, 
"1000");
+    
properties.setProperty(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY, 
"3000");
+    properties.setProperty(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
+    properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
+    properties.setProperty(ZK_SESSION_TIMEOUT_MS_PROP_KEY, "10000");
+    properties.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, "10000");
+    properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
 
     config = getConfigBuilder()
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
@@ -104,10 +120,15 @@ public class 
TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockPr
 
   @AfterEach
   public void clean() throws IOException {
-    cleanupResources();
-    FileIOUtils.deleteDirectory(new File(basePath));
-    if (server != null) {
-      server.close();
+    try {
+      cleanupResources();
+      FileIOUtils.deleteDirectory(new File(basePath));
+    } finally {
+      // Always stop the embedded ZooKeeper server, even if resource cleanup 
or directory
+      // deletion above throws, so the server is not leaked across 
parameterized runs.
+      if (server != null) {
+        server.close();
+      }
     }
   }
 

Reply via email to