This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d5443e3 Hive: Add table-level JVM lock on commits (#2547)
d5443e3 is described below
commit d5443e3a34a4288441a015ab616d965557d78202
Author: Marton Bod <[email protected]>
AuthorDate: Fri May 7 21:37:02 2021 +0200
Hive: Add table-level JVM lock on commits (#2547)
Co-authored-by: Marton Bod <[email protected]>
---
.../apache/iceberg/hive/HiveTableOperations.java | 23 ++++++++++++++
.../apache/iceberg/hive/TestHiveCommitLocks.java | 36 ++++++++++++++++++++++
2 files changed, 59 insertions(+)
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 7c45882..647c626 100644
---
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.hive;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
@@ -28,7 +30,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -82,9 +86,11 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS =
"iceberg.hive.lock-timeout-ms";
private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS =
"iceberg.hive.lock-check-min-wait-ms";
private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS =
"iceberg.hive.lock-check-max-wait-ms";
+ private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
"iceberg.hive.table-level-lock-evict-ms";
private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 *
1000; // 3 minutes
private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50
milliseconds
private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; //
5 seconds
+ private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(10);
private static final DynMethods.UnboundMethod ALTER_TABLE =
DynMethods.builder("alter_table")
.impl(HiveMetaStoreClient.class, "alter_table_with_environmentContext",
String.class, String.class, Table.class, EnvironmentContext.class)
@@ -96,6 +102,15 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
GC_ENABLED, "external.table.purge"
);
+ private static Cache<String, ReentrantLock> commitLockCache;
+
+ private static synchronized void initTableLevelLockCache(long
evictionTimeout) {
+ if (commitLockCache == null) {
+ commitLockCache = Caffeine.newBuilder()
+ .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+ .build();
+ }
+ }
/**
* Provides key translation where necessary between Iceberg and HMS props.
This translation is needed because some
@@ -144,6 +159,9 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS,
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
this.lockCheckMaxWaitTime =
conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS,
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+ long tableLevelLockCacheEvictionTimeout =
+ conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS,
HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+ initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
}
@Override
@@ -191,6 +209,10 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
CommitStatus commitStatus = CommitStatus.FAILURE;
boolean updateHiveTable = false;
Optional<Long> lockId = Optional.empty();
+ // getting a process-level lock per table to avoid concurrent commit
attempts to the same table from the same
+ // JVM process, which would result in unnecessary and costly HMS lock
acquisition requests
+ ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new
ReentrantLock(true));
+ tableLevelMutex.lock();
try {
lockId = Optional.of(acquireLock());
// TODO add lock heart beating for cases where default lock timeout is
too low.
@@ -267,6 +289,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
} finally {
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId);
+ tableLevelMutex.unlock();
}
}
diff --git
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 531e511..ada7fa6 100644
---
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -20,9 +20,14 @@
package org.apache.iceberg.hive;
import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.iceberg.AssertHelpers;
@@ -42,7 +47,11 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestHiveCommitLocks extends HiveTableBaseTest {
@@ -210,4 +219,31 @@ public class TestHiveCommitLocks extends HiveTableBaseTest
{
"Could not acquire the lock on",
() -> spyOps.doCommit(metadataV2, metadataV1));
}
+
+ @Test
+ public void
testTableLevelProcessLockBlocksConcurrentHMSRequestsForSameTable() throws
Exception {
+ int numConcurrentCommits = 10;
+ // resetting the spy client to forget about prior call history
+ reset(spyClient);
+
+ // simulate several concurrent commit operations on the same table
+ ExecutorService executor =
Executors.newFixedThreadPool(numConcurrentCommits);
+ IntStream.range(0, numConcurrentCommits).forEach(i ->
+ executor.submit(() -> {
+ try {
+ spyOps.doCommit(metadataV2, metadataV1);
+ } catch (CommitFailedException e) {
+ // failures are expected here when checking the base version
+ // it's no problem, we're not testing the actual commit success
here, only the HMS lock acquisition attempts
+ }
+ }));
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+
+ // intra-process commits to the same table should be serialized now
+ // i.e. no thread should receive WAITING state from HMS and have to call
checkLock periodically
+ verify(spyClient, never()).checkLock(any(Long.class));
+ // all threads eventually got their turn
+ verify(spyClient,
times(numConcurrentCommits)).lock(any(LockRequest.class));
+ }
}