This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d5443e3  Hive: Add table-level JVM lock on commits (#2547)
d5443e3 is described below

commit d5443e3a34a4288441a015ab616d965557d78202
Author: Marton Bod <[email protected]>
AuthorDate: Fri May 7 21:37:02 2021 +0200

    Hive: Add table-level JVM lock on commits (#2547)
    
    Co-authored-by: Marton Bod <[email protected]>
---
 .../apache/iceberg/hive/HiveTableOperations.java   | 23 ++++++++++++++
 .../apache/iceberg/hive/TestHiveCommitLocks.java   | 36 ++++++++++++++++++++++
 2 files changed, 59 insertions(+)

diff --git 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 7c45882..647c626 100644
--- 
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ 
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg.hive;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
@@ -28,7 +30,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -82,9 +86,11 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
   private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = 
"iceberg.hive.lock-timeout-ms";
   private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = 
"iceberg.hive.lock-check-min-wait-ms";
   private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = 
"iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = 
"iceberg.hive.table-level-lock-evict-ms";
   private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
   private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
   private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 
5 seconds
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = 
TimeUnit.MINUTES.toMillis(10);
   private static final DynMethods.UnboundMethod ALTER_TABLE = 
DynMethods.builder("alter_table")
       .impl(HiveMetaStoreClient.class, "alter_table_with_environmentContext",
           String.class, String.class, Table.class, EnvironmentContext.class)
@@ -96,6 +102,15 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
       GC_ENABLED, "external.table.purge"
   );
 
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private static synchronized void initTableLevelLockCache(long 
evictionTimeout) {
+    if (commitLockCache == null) {
+      commitLockCache = Caffeine.newBuilder()
+          .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+          .build();
+    }
+  }
 
   /**
    * Provides key translation where necessary between Iceberg and HMS props. 
This translation is needed because some
@@ -144,6 +159,9 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
         conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, 
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
     this.lockCheckMaxWaitTime =
         conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, 
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, 
HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
   }
 
   @Override
@@ -191,6 +209,10 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
     CommitStatus commitStatus = CommitStatus.FAILURE;
     boolean updateHiveTable = false;
     Optional<Long> lockId = Optional.empty();
+    // getting a process-level lock per table to avoid concurrent commit 
attempts to the same table from the same
+    // JVM process, which would result in unnecessary and costly HMS lock 
acquisition requests
+    ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new 
ReentrantLock(true));
+    tableLevelMutex.lock();
     try {
       lockId = Optional.of(acquireLock());
       // TODO add lock heart beating for cases where default lock timeout is 
too low.
@@ -267,6 +289,7 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
 
     } finally {
       cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId);
+      tableLevelMutex.unlock();
     }
   }
 
diff --git 
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java 
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 531e511..ada7fa6 100644
--- 
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++ 
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -20,9 +20,14 @@
 package org.apache.iceberg.hive;
 
 import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.iceberg.AssertHelpers;
@@ -42,7 +47,11 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestHiveCommitLocks extends HiveTableBaseTest {
@@ -210,4 +219,31 @@ public class TestHiveCommitLocks extends HiveTableBaseTest 
{
         "Could not acquire the lock on",
         () -> spyOps.doCommit(metadataV2, metadataV1));
   }
+
+  @Test
+  public void 
testTableLevelProcessLockBlocksConcurrentHMSRequestsForSameTable() throws 
Exception {
+    int numConcurrentCommits = 10;
+    // resetting the spy client to forget about prior call history
+    reset(spyClient);
+
+    // simulate several concurrent commit operations on the same table
+    ExecutorService executor = 
Executors.newFixedThreadPool(numConcurrentCommits);
+    IntStream.range(0, numConcurrentCommits).forEach(i ->
+        executor.submit(() -> {
+          try {
+            spyOps.doCommit(metadataV2, metadataV1);
+          } catch (CommitFailedException e) {
+            // failures are expected here when checking the base version
+            // it's no problem, we're not testing the actual commit success 
here, only the HMS lock acquisition attempts
+          }
+        }));
+    executor.shutdown();
+    executor.awaitTermination(30, TimeUnit.SECONDS);
+
+    // intra-process commits to the same table should be serialized now
+    // i.e. no thread should receive WAITING state from HMS and have to call 
checkLock periodically
+    verify(spyClient, never()).checkLock(any(Long.class));
+    // all threads eventually got their turn
+    verify(spyClient, 
times(numConcurrentCommits)).lock(any(LockRequest.class));
+  }
 }

Reply via email to