This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c392d38c15e HIVE-28851: HiveIcebergMetaHook acquires an HMS lock, 
regardless of the config and operations (owenmonn, reviewed by Denys Kuzmenko)
c392d38c15e is described below

commit c392d38c15e10673e133d34a6ea577fe18f709b6
Author: owenmonn <[email protected]>
AuthorDate: Mon Apr 7 02:50:58 2025 +0900

    HIVE-28851: HiveIcebergMetaHook acquires an HMS lock, regardless of the 
config and operations (owenmonn, reviewed by Denys Kuzmenko)
    
    Closes #5722
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  4 +--
 .../java/org/apache/iceberg/hive/HiveLock.java     |  2 +-
 .../apache/iceberg/hive/HiveTableOperations.java   | 13 +++++---
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       | 25 ++++++++++++++--
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 16 ++++++++--
 .../iceberg/mr/hive/TestHiveIcebergStatistics.java | 35 ++++++++++++++++++++++
 .../org/apache/hadoop/hive/ql/TestTxnCommands.java |  1 +
 7 files changed, 84 insertions(+), 12 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0b8b72ea704..4da48659e28 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3123,7 +3123,7 @@ public static enum ConfVars {
     TXN_MERGE_INSERT_X_LOCK("hive.txn.xlock.mergeinsert", false,
         "Ensures MERGE INSERT operations acquire EXCLUSIVE / EXCL_WRITE lock 
for transactional tables.\n" +
         "If enabled, prevents duplicates when MERGE statements are executed in 
parallel transactions."),
-    TXN_WRITE_X_LOCK("hive.txn.xlock.write", true,
+    TXN_WRITE_X_LOCK("hive.txn.xlock.write", false,
         "Manages concurrency levels for ACID resources. Provides better level 
of query parallelism by enabling " +
         "shared writes and write-write conflict resolution at the commit 
step." +
         "- If true - exclusive writes are used:\n" +
@@ -3132,7 +3132,7 @@ public static enum ConfVars {
         "  - INSERT acquires SHARED_READ locks\n" +
         "- If false - shared writes, transaction is aborted in case of 
conflicting changes:\n" +
         "  - INSERT OVERWRITE acquires EXCL_WRITE locks\n" +
-        "  - INSERT/UPDATE/DELETE acquire SHARED_READ locks"),
+        "  - INSERT/UPDATE/DELETE acquire SHARED_WRITE locks"),
     HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", true,
         "Whether Hive supports transactional stats (accurate stats for 
transactional tables)"),
     HIVE_TXN_ACID_DIR_CACHE_DURATION("hive.txn.acid.dir.cache.duration",
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java
index 20517f3e905..442385dc8b4 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java
@@ -19,7 +19,7 @@
 
 package org.apache.iceberg.hive;
 
-interface HiveLock {
+public interface HiveLock {
   void lock() throws LockException;
 
   void ensureActive() throws LockException;
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 8a4d8663770..b7142a5f491 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -57,6 +57,7 @@
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.util.PropertyUtil;
 import org.apache.parquet.hadoop.ParquetOutputFormat;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -476,6 +477,10 @@ private static boolean hiveEngineEnabled(TableMetadata 
metadata, Configuration c
         ConfigProperties.ENGINE_HIVE_ENABLED, true);
   }
 
+  private static boolean hiveLockEnabled(TableMetadata metadata, Configuration 
conf) {
+    return hiveLockEnabled(metadata != null ? metadata.properties() : null, 
conf);
+  }
+
   /**
    * Returns if the hive locking should be enabled on the table, or not.
    *
@@ -489,14 +494,14 @@ private static boolean hiveEngineEnabled(TableMetadata 
metadata, Configuration c
    *       TableProperties#HIVE_LOCK_ENABLED_DEFAULT}
    * </ol>
    *
-   * @param metadata Table metadata to use
+   * @param properties Table properties to use
    * @param conf The hive configuration to use
    * @return if the hive engine related values should be enabled or not
    */
-  private static boolean hiveLockEnabled(TableMetadata metadata, Configuration 
conf) {
-    if (metadata != null && 
metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null) {
+  public static boolean hiveLockEnabled(Map<String, String> properties, 
Configuration conf) {
+    if (properties != null && 
properties.containsKey(TableProperties.HIVE_LOCK_ENABLED)) {
       // We know that the property is set, so default value will not be used,
-      return metadata.propertyAsBoolean(TableProperties.HIVE_LOCK_ENABLED, 
false);
+      return PropertyUtil.propertyAsBoolean(properties, 
TableProperties.HIVE_LOCK_ENABLED, false);
     }
 
     return conf.getBoolean(
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 534afe11afe..6bb5f27e76e 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -55,6 +55,7 @@
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
 import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
@@ -70,6 +71,7 @@
 import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.hadoop.hive.ql.util.NullOrdering;
@@ -116,9 +118,11 @@
 import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.expressions.UnboundTerm;
 import org.apache.iceberg.hive.CachedClientPool;
+import org.apache.iceberg.hive.HiveLock;
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.hive.HiveTableOperations;
 import org.apache.iceberg.hive.MetastoreLock;
+import org.apache.iceberg.hive.NoLock;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.mapping.MappingUtil;
@@ -193,7 +197,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
   private Transaction transaction;
   private AlterTableType currentAlterTableOp;
   private boolean createHMSTableInHook = false;
-  private MetastoreLock commitLock;
+  private HiveLock commitLock;
 
   private enum FileFormat {
     ORC("orc"), PARQUET("parquet"), AVRO("avro");
@@ -409,8 +413,7 @@ public void 
preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
           context.getProperties().get(OLD_TABLE_NAME)).toString());
     }
     if (commitLock == null) {
-      commitLock = new MetastoreLock(conf, new CachedClientPool(conf, 
Maps.fromProperties(catalogProperties)),
-          catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(), 
hmsTable.getTableName());
+      commitLock = lockObject(hmsTable);
     }
 
     try {
@@ -422,6 +425,22 @@ public void 
preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
     }
   }
 
+  private HiveLock lockObject(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    if (!HiveTableOperations.hiveLockEnabled(hmsTable.getParameters(), conf) ||
+        SessionStateUtil.getQueryState(conf)
+            .map(QueryState::getHiveOperation)
+            .filter(opType -> HiveOperation.QUERY == opType)
+            .isPresent()) {
+      return new NoLock();
+    } else {
+      return new MetastoreLock(
+          conf,
+          new CachedClientPool(conf, Maps.fromProperties(catalogProperties)),
+          catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(),
+          hmsTable.getTableName());
+    }
+  }
+
   private void doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, EnvironmentContext context)
       throws MetaException {
     try {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 54ce20e3a58..013bf168cf3 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -178,6 +178,7 @@
 import org.apache.iceberg.hadoop.ConfigProperties;
 import org.apache.iceberg.hadoop.HadoopConfigurable;
 import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.HiveTableOperations;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
@@ -836,13 +837,24 @@ private void checkAndMergeColStats(List<ColumnStatistics> 
statsNew, Table tbl) t
    */
   @Override
   public LockType getLockType(WriteEntity writeEntity) {
+    org.apache.hadoop.hive.ql.metadata.Table hmsTable = writeEntity.getTable();
+    boolean sharedWrite = !HiveConf.getBoolVar(conf, 
ConfVars.TXN_WRITE_X_LOCK);
     // Materialized views stored by Iceberg and the MV metadata is stored in 
HMS doesn't need write locking because
     // the locking is done by DbTxnManager.acquireMaterializationRebuildLock()
     if (TableType.MATERIALIZED_VIEW == writeEntity.getTable().getTableType()) {
       return LockType.SHARED_READ;
     }
-    if (WriteEntity.WriteType.INSERT_OVERWRITE == writeEntity.getWriteType()) {
-      return LockType.EXCL_WRITE;
+    if (HiveTableOperations.hiveLockEnabled(hmsTable.getParameters(), conf)) {
+      throw new RuntimeException("Hive locking on table `" + 
hmsTable.getFullTableName() +
+          "`cannot be enabled when `engine.hive.lock-enabled`=`true`. " +
+          "Disable `engine.hive.lock-enabled` to use Hive locking");
+    }
+    switch (writeEntity.getWriteType()) {
+      case INSERT_OVERWRITE:
+        return LockType.EXCL_WRITE;
+      case UPDATE:
+      case DELETE:
+        return sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE;
     }
     return LockType.SHARED_WRITE;
   }
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
index aaf7138056d..832676df55b 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -130,6 +131,40 @@ public void testStatsWithInsert() {
     checkColStatMinMaxValue(identifier.name(), "customer_id", 0, 5);
   }
 
+  @Test
+  public void testStatsWithPessimisticLockInsert() {
+    Assume.assumeTrue(testTableType == TestTables.TestTableType.HIVE_CATALOG);
+    TableIdentifier identifier = 
getTableIdentifierWithPessimisticLock("false");
+    String insert = 
testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 
identifier, false);
+    shell.executeStatement(insert);
+
+    checkColStat(identifier.name(), "customer_id", true);
+    checkColStatMinMaxValue(identifier.name(), "customer_id", 0, 2);
+  }
+
+  @Test
+  public void testStatsWithPessimisticLockInsertWhenHiveLockEnabled() {
+    Assume.assumeTrue(testTableType == TestTables.TestTableType.HIVE_CATALOG);
+    TableIdentifier identifier = getTableIdentifierWithPessimisticLock("true");
+    String insert = 
testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 
identifier, false);
+    AssertHelpers.assertThrows(
+        "Should throw RuntimeException when Hive locking is on with 
'engine.hive.lock-enabled=true'",
+        RuntimeException.class,
+        () -> shell.executeStatement(insert)
+    );
+  }
+
+  private TableIdentifier getTableIdentifierWithPessimisticLock(String 
hiveLockEnabled) {
+    TableIdentifier identifier = TableIdentifier.of("default", "customers");
+
+    shell.setHiveSessionValue(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER.varname, 
true);
+    
shell.setHiveSessionValue(HiveConf.ConfVars.HIVE_TXN_EXT_LOCKING_ENABLED.varname,
 true);
+    testTables.createTable(shell, identifier.name(), 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of(), 
formatVersion,
+        ImmutableMap.of(TableProperties.HIVE_LOCK_ENABLED, hiveLockEnabled));
+    return identifier;
+  }
+
   @Test
   public void testStatsWithInsertOverwrite() {
     TableIdentifier identifier = TableIdentifier.of("default", "customers");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 266e0ef299e..2eaed2f1107 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -404,6 +404,7 @@ private IMetaStoreClient prepareParallelTest(String 
tableName, int val)
       throws Exception, MetaException, TException, NoSuchObjectException {
     hiveConf.setBoolean("hive.stats.autogather", true);
     hiveConf.setBoolean("hive.stats.column.autogather", true);
+    hiveConf.setBoolean("hive.txn.xlock.write", true);
     // Need to close the thread local Hive object so that configuration change 
is reflected to HMS.
     Hive.closeCurrent();
     runStatementOnDriver("drop table if exists " + tableName);

Reply via email to