This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1e0c159bf4a [HUDI-7198] Create nested node path if does not exist for
zookeeper. (#10438)
1e0c159bf4a is described below
commit 1e0c159bf4abc14d871a7e3fcf53a088992d6915
Author: harshal <[email protected]>
AuthorDate: Thu Jan 4 12:59:16 2024 +0530
[HUDI-7198] Create nested node path if does not exist for zookeeper.
(#10438)
---
.../lock/ZookeeperBasedLockProvider.java | 42 ++++++++++++++++++++++
1 file changed, 42 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
index 31b92dcf914..4299a603ece 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
@@ -31,6 +31,7 @@ import
org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,8 +75,48 @@ public class ZookeeperBasedLockProvider implements
LockProvider<InterProcessMute
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
.build();
this.curatorFrameworkClient.start();
+ createPathIfNotExists();
}
+ private String getLockPath() {
+ return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
+ + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
+ }
+
+ private void createPathIfNotExists() {
+ try {
+ String lockPath = getLockPath();
+ LOG.info(String.format("Creating zookeeper path %s if not exists",
lockPath));
+ String[] parts = lockPath.split("/");
+ StringBuilder currentPath = new StringBuilder();
+ for (String part : parts) {
+ if (!part.isEmpty()) {
+ currentPath.append("/").append(part);
+ createNodeIfNotExists(currentPath.toString());
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to create ZooKeeper path: " + e.getMessage());
+ throw new HoodieLockException("Failed to initialize ZooKeeper path", e);
+ }
+ }
+
+ private void createNodeIfNotExists(String path) throws Exception {
+ if (this.curatorFrameworkClient.checkExists().forPath(path) == null) {
+ try {
+ this.curatorFrameworkClient.create().forPath(path);
+ // to avoid failure due to synchronous calls.
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.NODEEXISTS) {
+ LOG.debug(String.format("Node already exist for path = %s", path));
+ } else {
+ throw new HoodieLockException("Failed to create zookeeper node", e);
+ }
+ }
+ }
+ }
+
+
// Only used for testing
public ZookeeperBasedLockProvider(
final LockConfiguration lockConfiguration, final CuratorFramework
curatorFrameworkClient) {
@@ -85,6 +126,7 @@ public class ZookeeperBasedLockProvider implements
LockProvider<InterProcessMute
synchronized (this.curatorFrameworkClient) {
if (this.curatorFrameworkClient.getState() !=
CuratorFrameworkState.STARTED) {
this.curatorFrameworkClient.start();
+ createPathIfNotExists();
}
}
}