This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4499b0b2bd43 feat(spark): ZooKeeper node should hold spark app id (for 
helping debug when lock is held for long time) (#18123)
4499b0b2bd43 is described below

commit 4499b0b2bd43bd90f4dd22e9631d401dae4c19da
Author: Krishen <[email protected]>
AuthorDate: Thu Mar 5 14:19:02 2026 -0800

    feat(spark): ZooKeeper node should hold spark app id (for helping debug 
when lock is held for long time) (#18123)
    
    
    ---------
    
    Co-authored-by: Krishen Bhan <“[email protected]”>
---
 .../org/apache/hudi/client/BaseHoodieClient.java   |  3 +
 .../lock/BaseZookeeperBasedLockProvider.java       |  4 +-
 .../transaction/lock/HoodieInterProcessMutex.java  | 65 ++++++++++++++++
 .../hudi/client/transaction/lock/LockManager.java  |  5 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 15 ++++
 .../lock/TestHoodieInterProcessMutex.java          | 91 ++++++++++++++++++++++
 .../client/common/HoodieSparkEngineContext.java    |  5 ++
 .../callback/TestHoodieClientInitCallback.java     |  1 +
 .../hudi/common/config/LockConfiguration.java      |  2 +
 .../hudi/common/engine/HoodieEngineContext.java    |  8 ++
 10 files changed, 196 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 71ac0637577e..6fe7771d7252 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -37,6 +37,8 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.config.HoodieWriteConfig;
+
+import static org.apache.hudi.config.HoodieWriteConfig.APPLICATION_ID;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -111,6 +113,7 @@ public abstract class BaseHoodieClient implements 
Serializable, AutoCloseable {
     this.context = context;
     this.basePath = clientConfig.getBasePath();
     this.config = clientConfig;
+    this.config.setValue(APPLICATION_ID, context.getApplicationId());
     this.timelineServer = timelineServer;
     shouldStopTimelineServer = !timelineServer.isPresent();
     this.heartbeatClient = new HoodieHeartbeatClient(storage, this.basePath,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
index af82fe2d7ae0..d5b04c15c005 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
@@ -170,8 +170,8 @@ public abstract class BaseZookeeperBasedLockProvider 
implements LockProvider<Int
 
   private void acquireLock(long time, TimeUnit unit) throws Exception {
     ValidationUtils.checkArgument(this.lock == null, 
generateLogStatement(LockState.ALREADY_ACQUIRED, generateLogSuffixString()));
-    InterProcessMutex newLock = new InterProcessMutex(
-        this.curatorFrameworkClient, getLockPath());
+    InterProcessMutex newLock = new HoodieInterProcessMutex(
+        this.curatorFrameworkClient, getLockPath(), this.lockConfiguration);
     boolean acquired = newLock.acquire(time, unit);
     if (!acquired) {
       throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/HoodieInterProcessMutex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/HoodieInterProcessMutex.java
new file mode 100644
index 000000000000..80f482eb2769
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/HoodieInterProcessMutex.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.hudi.common.config.LockConfiguration;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * This is a HUDI specific wrapper for {@link InterProcessMutex} to allow for
+ * passing in and utilizing information from {@link LockConfiguration}.
+ * For example, the application id is passed as part of the lock information,
+ * meaning that when the (distributed) lock is acquired other users can see the
+ * application id as part of the lock information (when using tools like zkcli)
+ */
+public class HoodieInterProcessMutex extends InterProcessMutex {
+
+  // Data that will be added to lock node upon lock being acquired.
+  // This can be used to provide metadata about the thread holding the lock,
+  // such as application id of the job
+  private final byte[] lockNodeBytes;
+
+  public HoodieInterProcessMutex(final CuratorFramework client, final String 
path,
+      final LockConfiguration config) {
+    super(client, path);
+    Map<String, String> lockNodeData = new HashMap<>();
+    String applicationId = config
+        .getConfig()
+        .getString(LockConfiguration.LOCK_HOLDER_APP_ID_KEY, "Unknown");
+    lockNodeData.put("application_id", applicationId);
+    this.lockNodeBytes = lockNodeData
+        .entrySet()
+        .stream()
+        .map(entry -> entry.getKey() + "=" + entry.getValue())
+        .collect(Collectors.joining(","))
+        .getBytes(StandardCharsets.UTF_8);
+  }
+
+  @Override
+  protected byte[] getLockNodeBytes() {
+    return this.lockNodeBytes;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index a660cb3c4e09..21eb5da61575 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -61,7 +61,10 @@ public class LockManager implements Serializable, 
AutoCloseable {
   public LockManager(HoodieWriteConfig writeConfig, HoodieStorage storage, 
TypedProperties lockProps) {
     this.writeConfig = writeConfig;
     this.storageConf = storage.getConf().newInstance();
-    this.lockConfiguration = new LockConfiguration(lockProps);
+    TypedProperties lockPropsWithAppId = new TypedProperties();
+    lockPropsWithAppId.putAll(lockProps);
+    lockPropsWithAppId.put(LockConfiguration.LOCK_HOLDER_APP_ID_KEY, 
writeConfig.getApplicationId());
+    this.lockConfiguration = new LockConfiguration(lockPropsWithAppId);
     maxRetries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
         
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
     maxWaitTimeInMs = 
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f961fca2c314..734339d5fdef 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -939,6 +939,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Flag to indicate whether to ignore any non exception 
error (e.g. write status error)."
           + "By default true for backward compatibility.");
 
+  public static final ConfigProperty<String> APPLICATION_ID = ConfigProperty
+      .key("hoodie.write.application.id")
+      .defaultValue("Unknown")
+      .markAdvanced()
+      .withDocumentation("Application identifier (e.g. Spark application id) 
used to populate lock metadata so lock holders can be identified.");
+
   /**
    * Config key with boolean value that indicates whether record being written 
during MERGE INTO Spark SQL
    * operation are already prepped.
@@ -2730,6 +2736,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(HoodieLockConfig.HIVE_TABLE_NAME);
   }
 
+  public String getApplicationId() {
+    return getStringOrDefault(APPLICATION_ID);
+  }
+
   public ConflictResolutionStrategy getWriteConflictResolutionStrategy() {
     return 
ReflectionUtils.loadClass(getString(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME));
   }
@@ -3470,6 +3480,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withApplicationId(String appId) {
+      writeConfig.setValue(APPLICATION_ID, appId);
+      return this;
+    }
+
     public Builder withProperties(Properties properties) {
       this.writeConfig.getProps().putAll(properties);
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestHoodieInterProcessMutex.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestHoodieInterProcessMutex.java
new file mode 100644
index 000000000000..d9e9fcf70f7a
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestHoodieInterProcessMutex.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_HOLDER_APP_ID_KEY;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link HoodieInterProcessMutex#getLockNodeBytes()}.
+ */
+public class TestHoodieInterProcessMutex {
+
+  private static final String LOCK_PATH = "/hudi/test/lock";
+
+  @Test
+  public void testGetLockNodeBytesContainsApplicationIdFromConfig() throws 
Exception {
+    String appId = "my-spark-app-12345";
+    LockConfiguration config = new 
LockConfiguration(createPropsWithAppId(appId));
+
+    HoodieInterProcessMutex mutex = new 
HoodieInterProcessMutex(mock(CuratorFramework.class), LOCK_PATH, config);
+    byte[] lockNodeBytes = getLockNodeBytes(mutex);
+
+    String lockNodeData = new String(lockNodeBytes, StandardCharsets.UTF_8);
+    assertTrue(lockNodeData.contains("application_id=" + appId),
+        "Lock node data should contain application_id from config: " + 
lockNodeData);
+  }
+
+  @Test
+  public void testGetLockNodeBytesDefaultsToUnknownWhenAppIdNotSet() throws 
Exception {
+    LockConfiguration config = new LockConfiguration(new Properties());
+
+    HoodieInterProcessMutex mutex = new 
HoodieInterProcessMutex(mock(CuratorFramework.class), LOCK_PATH, config);
+    byte[] lockNodeBytes = getLockNodeBytes(mutex);
+
+    String lockNodeData = new String(lockNodeBytes, StandardCharsets.UTF_8);
+    assertTrue(lockNodeData.contains("application_id=Unknown"),
+        "Lock node data should default application_id to Unknown: " + 
lockNodeData);
+  }
+
+  @Test
+  public void testGetLockNodeBytesFormat() throws Exception {
+    String appId = "test-app";
+    LockConfiguration config = new 
LockConfiguration(createPropsWithAppId(appId));
+
+    HoodieInterProcessMutex mutex = new 
HoodieInterProcessMutex(mock(CuratorFramework.class), LOCK_PATH, config);
+    byte[] lockNodeBytes = getLockNodeBytes(mutex);
+
+    String expected = "application_id=" + appId;
+    assertArrayEquals(expected.getBytes(StandardCharsets.UTF_8), lockNodeBytes,
+        "Lock node bytes should be in key=value format");
+  }
+
+  private static Properties createPropsWithAppId(String appId) {
+    Properties props = new Properties();
+    props.setProperty(LOCK_HOLDER_APP_ID_KEY, appId);
+    return props;
+  }
+
+  private static byte[] getLockNodeBytes(HoodieInterProcessMutex mutex) throws 
Exception {
+    Method method = 
HoodieInterProcessMutex.class.getDeclaredMethod("getLockNodeBytes");
+    method.setAccessible(true);
+    return (byte[]) method.invoke(mutex);
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index ea435d4a39a0..7e0d9db2feaf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -249,6 +249,11 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
     javaSparkContext.cancelAllJobs();
   }
 
+  @Override
+  public String getApplicationId() {
+    return javaSparkContext.sc().applicationId();
+  }
+
   @Override
   public <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
     Function2<O, I, O> seqOpFunc = seqOp::apply;
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
index 41edbd04b972..a5458674763a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
@@ -85,6 +85,7 @@ public class TestHoodieClientInitCallback {
 
     StorageConfiguration storageConfToReturn = getDefaultStorageConf();
     when(engineContext.getStorageConf()).thenReturn(storageConfToReturn);
+    when(engineContext.getApplicationId()).thenReturn("test-app-id");
   }
 
   @Test
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
index 9170b941a8c6..809e33954425 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
@@ -81,6 +81,8 @@ public class LockConfiguration implements Serializable {
 
   public static final String ZK_LOCK_KEY_PROP_KEY = 
ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "lock_key";
 
+  public static final String LOCK_HOLDER_APP_ID_KEY = LOCK_PREFIX + "app_id";
+
   /** @deprecated Use {@link #LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY} 
*/
   @Deprecated
   public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP = 
LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 327dda0a736f..2fce591b60e9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -123,6 +123,14 @@ public abstract class HoodieEngineContext {
 
   public abstract void cancelAllJobs();
 
+  /**
+   * Returns the application id of the engine (e.g. Spark application id).
+   * Used to populate lock metadata so lock holders can be identified.
+   */
+  public String getApplicationId() {
+    return "Unknown";
+  }
+
   /**
    * Aggregate the elements of each partition, and then the results for all 
the partitions, using given combine functions and a neutral "zero value".
    *

Reply via email to