This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4fbf5c52f19 [HUDI-7601] Add heartbeat mechanism to refresh lock
(#10994)
4fbf5c52f19 is described below
commit 4fbf5c52f19b1e3192b09f9362e35dd22c3a0da6
Author: Yann Byron <[email protected]>
AuthorDate: Fri Apr 12 14:12:04 2024 +0800
[HUDI-7601] Add heartbeat mechanism to refresh lock (#10994)
* [HUDI-7601] Add heartbeat mechanism to refresh lock
---
.../org/apache/hudi/config/HoodieLockConfig.java | 13 +++++++
.../hudi/common/config/LockConfiguration.java | 3 ++
.../hudi/hive/transaction/lock/Heartbeat.java | 42 ++++++++++++++++++++++
.../lock/HiveMetastoreBasedLockProvider.java | 23 ++++++++++--
4 files changed, 79 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index fa38da8f8ab..1c51b6db8b3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -37,6 +37,7 @@ import java.util.Properties;
import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS;
import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
@@ -50,6 +51,7 @@ import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_R
import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_HEARTBEAT_INTERVAL_MS_KEY;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
@@ -112,6 +114,12 @@ public class HoodieLockConfig extends HoodieConfig {
.sinceVersion("0.8.0")
.withDocumentation("Timeout in ms, to wait on an individual lock
acquire() call, at the lock provider.");
+ public static final ConfigProperty<Integer> LOCK_HEARTBEAT_INTERVAL_MS =
ConfigProperty
+ .key(LOCK_HEARTBEAT_INTERVAL_MS_KEY)
+ .defaultValue(DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS)
+ .sinceVersion("1.0.0")
+ .withDocumentation("Heartbeat interval in ms, to send a heartbeat to
indicate that hive client holding locks.");
+
public static final ConfigProperty<String> FILESYSTEM_LOCK_PATH =
ConfigProperty
.key(FILESYSTEM_LOCK_PATH_PROP_KEY)
.noDefaultValue()
@@ -343,6 +351,11 @@ public class HoodieLockConfig extends HoodieConfig {
return this;
}
+ public HoodieLockConfig.Builder withHeartbeatIntervalInMillis(Long
intervalInMillis) {
+ lockConfig.setValue(LOCK_HEARTBEAT_INTERVAL_MS,
String.valueOf(intervalInMillis));
+ return this;
+ }
+
public HoodieLockConfig.Builder
withConflictResolutionStrategy(ConflictResolutionStrategy
conflictResolutionStrategy) {
lockConfig.setValue(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
conflictResolutionStrategy.getClass().getName());
return this;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
index 1171dcf3fce..9d79be37810 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
@@ -45,6 +45,9 @@ public class LockConfiguration implements Serializable {
public static final String LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY =
LOCK_PREFIX + "wait_time_ms";
public static final int DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS = 60 * 1000;
+ public static final String LOCK_HEARTBEAT_INTERVAL_MS_KEY = LOCK_PREFIX +
"heartbeat_interval_ms";
+ public static final int DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS = 60 * 1000;
+
// configs for file system based locks. NOTE: This only works for DFS with
atomic create/delete operation
public static final String FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX =
LOCK_PREFIX + "filesystem.";
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/Heartbeat.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/Heartbeat.java
new file mode 100644
index 00000000000..14398af2c74
--- /dev/null
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/Heartbeat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hive.transaction.lock;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hudi.exception.HoodieLockException;
+
+class Heartbeat implements Runnable {
+ private final IMetaStoreClient client;
+ private final long lockId;
+
+ Heartbeat(IMetaStoreClient client, long lockId) {
+ this.client = client;
+ this.lockId = lockId;
+ }
+
+ @Override
+ public void run() {
+ try {
+ client.heartbeat(0, lockId);
+ } catch (Exception e) {
+ throw new HoodieLockException(String.format("Failed to heartbeat for
lock: %d", lockId));
+ }
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
index 0280621bb53..4c5aa5cb4f7 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
@@ -44,16 +44,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS;
import static
org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_HEARTBEAT_INTERVAL_MS_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY;
@@ -81,7 +84,8 @@ public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse
private IMetaStoreClient hiveClient;
private volatile LockResponse lock = null;
protected LockConfiguration lockConfiguration;
- ExecutorService executor = Executors.newSingleThreadExecutor();
+ private ScheduledFuture<?> future = null;
+ private final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(2);
public HiveMetastoreBasedLockProvider(final LockConfiguration
lockConfiguration, final Configuration conf) {
this(lockConfiguration);
@@ -128,6 +132,9 @@ public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse
return;
}
lock = null;
+ if (future != null) {
+ future.cancel(false);
+ }
hiveClient.unlock(lockResponseLocal.getLockid());
LOG.info(generateLogStatement(RELEASED, generateLogSuffixString()));
} catch (TException e) {
@@ -153,6 +160,9 @@ public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse
hiveClient.unlock(lock.getLockid());
lock = null;
}
+ if (future != null) {
+ future.cancel(false);
+ }
Hive.closeCurrent();
executor.shutdown();
} catch (Exception e) {
@@ -188,6 +198,12 @@ public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse
final LockRequest lockRequestFinal = lockRequest;
this.lock = executor.submit(() -> hiveClient.lock(lockRequestFinal))
.get(time, unit);
+
+ // refresh lock in case that certain commit takes a long time.
+ Heartbeat heartbeat = new Heartbeat(hiveClient, lock.getLockid());
+ long heartbeatIntervalMs = lockConfiguration.getConfig()
+ .getLong(LOCK_HEARTBEAT_INTERVAL_MS_KEY,
DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS);
+ future = executor.scheduleAtFixedRate(heartbeat, heartbeatIntervalMs /
2, heartbeatIntervalMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
if (this.lock == null || this.lock.getState() != LockState.ACQUIRED) {
LockResponse lockResponse =
this.hiveClient.checkLock(lockRequest.getTxnid());
@@ -202,6 +218,9 @@ public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse
if (this.lock != null && this.lock.getState() != LockState.ACQUIRED) {
hiveClient.unlock(this.lock.getLockid());
lock = null;
+ if (future != null) {
+ future.cancel(false);
+ }
}
}
}