This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c392d38c15e HIVE-28851: HiveIcebergMetaHook acquires an HMS lock,
regardless of the config and operations (owenmonn, reviewed by Denys Kuzmenko)
c392d38c15e is described below
commit c392d38c15e10673e133d34a6ea577fe18f709b6
Author: owenmonn <[email protected]>
AuthorDate: Mon Apr 7 02:50:58 2025 +0900
HIVE-28851: HiveIcebergMetaHook acquires an HMS lock, regardless of the
config and operations (owenmonn, reviewed by Denys Kuzmenko)
Closes #5722
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 4 +--
.../java/org/apache/iceberg/hive/HiveLock.java | 2 +-
.../apache/iceberg/hive/HiveTableOperations.java | 13 +++++---
.../iceberg/mr/hive/HiveIcebergMetaHook.java | 25 ++++++++++++++--
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 16 ++++++++--
.../iceberg/mr/hive/TestHiveIcebergStatistics.java | 35 ++++++++++++++++++++++
.../org/apache/hadoop/hive/ql/TestTxnCommands.java | 1 +
7 files changed, 84 insertions(+), 12 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0b8b72ea704..4da48659e28 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3123,7 +3123,7 @@ public static enum ConfVars {
TXN_MERGE_INSERT_X_LOCK("hive.txn.xlock.mergeinsert", false,
"Ensures MERGE INSERT operations acquire EXCLUSIVE / EXCL_WRITE lock
for transactional tables.\n" +
"If enabled, prevents duplicates when MERGE statements are executed in
parallel transactions."),
- TXN_WRITE_X_LOCK("hive.txn.xlock.write", true,
+ TXN_WRITE_X_LOCK("hive.txn.xlock.write", false,
"Manages concurrency levels for ACID resources. Provides better level
of query parallelism by enabling " +
"shared writes and write-write conflict resolution at the commit
step." +
"- If true - exclusive writes are used:\n" +
@@ -3132,7 +3132,7 @@ public static enum ConfVars {
" - INSERT acquires SHARED_READ locks\n" +
"- If false - shared writes, transaction is aborted in case of
conflicting changes:\n" +
" - INSERT OVERWRITE acquires EXCL_WRITE locks\n" +
- " - INSERT/UPDATE/DELETE acquire SHARED_READ locks"),
+ " - INSERT/UPDATE/DELETE acquire SHARED_WRITE locks"),
HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", true,
"Whether Hive supports transactional stats (accurate stats for
transactional tables)"),
HIVE_TXN_ACID_DIR_CACHE_DURATION("hive.txn.acid.dir.cache.duration",
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java
index 20517f3e905..442385dc8b4 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java
@@ -19,7 +19,7 @@
package org.apache.iceberg.hive;
-interface HiveLock {
+public interface HiveLock {
void lock() throws LockException;
void ensureActive() throws LockException;
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 8a4d8663770..b7142a5f491 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -57,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.util.PropertyUtil;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -476,6 +477,10 @@ private static boolean hiveEngineEnabled(TableMetadata
metadata, Configuration c
ConfigProperties.ENGINE_HIVE_ENABLED, true);
}
+ private static boolean hiveLockEnabled(TableMetadata metadata, Configuration
conf) {
+ return hiveLockEnabled(metadata != null ? metadata.properties() : null,
conf);
+ }
+
/**
* Returns if the hive locking should be enabled on the table, or not.
*
@@ -489,14 +494,14 @@ private static boolean hiveEngineEnabled(TableMetadata
metadata, Configuration c
* TableProperties#HIVE_LOCK_ENABLED_DEFAULT}
* </ol>
*
- * @param metadata Table metadata to use
+ * @param properties Table properties to use
* @param conf The hive configuration to use
* @return if the hive engine related values should be enabled or not
*/
- private static boolean hiveLockEnabled(TableMetadata metadata, Configuration
conf) {
- if (metadata != null &&
metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null) {
+ public static boolean hiveLockEnabled(Map<String, String> properties,
Configuration conf) {
+ if (properties != null &&
properties.containsKey(TableProperties.HIVE_LOCK_ENABLED)) {
// We know that the property is set, so default value will not be used,
- return metadata.propertyAsBoolean(TableProperties.HIVE_LOCK_ENABLED,
false);
+ return PropertyUtil.propertyAsBoolean(properties,
TableProperties.HIVE_LOCK_ENABLED, false);
}
return conf.getBoolean(
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 534afe11afe..6bb5f27e76e 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -55,6 +55,7 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
@@ -70,6 +71,7 @@
import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.ql.util.NullOrdering;
@@ -116,9 +118,11 @@
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.expressions.UnboundTerm;
import org.apache.iceberg.hive.CachedClientPool;
+import org.apache.iceberg.hive.HiveLock;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.hive.HiveTableOperations;
import org.apache.iceberg.hive.MetastoreLock;
+import org.apache.iceberg.hive.NoLock;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mapping.MappingUtil;
@@ -193,7 +197,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
private Transaction transaction;
private AlterTableType currentAlterTableOp;
private boolean createHMSTableInHook = false;
- private MetastoreLock commitLock;
+ private HiveLock commitLock;
private enum FileFormat {
ORC("orc"), PARQUET("parquet"), AVRO("avro");
@@ -409,8 +413,7 @@ public void
preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
context.getProperties().get(OLD_TABLE_NAME)).toString());
}
if (commitLock == null) {
- commitLock = new MetastoreLock(conf, new CachedClientPool(conf,
Maps.fromProperties(catalogProperties)),
- catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(),
hmsTable.getTableName());
+ commitLock = lockObject(hmsTable);
}
try {
@@ -422,6 +425,22 @@ public void
preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
}
}
+ private HiveLock lockObject(org.apache.hadoop.hive.metastore.api.Table
hmsTable) {
+ if (!HiveTableOperations.hiveLockEnabled(hmsTable.getParameters(), conf) ||
+ SessionStateUtil.getQueryState(conf)
+ .map(QueryState::getHiveOperation)
+ .filter(opType -> HiveOperation.QUERY == opType)
+ .isPresent()) {
+ return new NoLock();
+ } else {
+ return new MetastoreLock(
+ conf,
+ new CachedClientPool(conf, Maps.fromProperties(catalogProperties)),
+ catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(),
+ hmsTable.getTableName());
+ }
+ }
+
private void doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table
hmsTable, EnvironmentContext context)
throws MetaException {
try {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 54ce20e3a58..013bf168cf3 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -178,6 +178,7 @@
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.HiveTableOperations;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
@@ -836,13 +837,24 @@ private void checkAndMergeColStats(List<ColumnStatistics>
statsNew, Table tbl) t
*/
@Override
public LockType getLockType(WriteEntity writeEntity) {
+ org.apache.hadoop.hive.ql.metadata.Table hmsTable = writeEntity.getTable();
+ boolean sharedWrite = !HiveConf.getBoolVar(conf,
ConfVars.TXN_WRITE_X_LOCK);
// Materialized views stored by Iceberg and the MV metadata is stored in
HMS doesn't need write locking because
// the locking is done by DbTxnManager.acquireMaterializationRebuildLock()
if (TableType.MATERIALIZED_VIEW == writeEntity.getTable().getTableType()) {
return LockType.SHARED_READ;
}
- if (WriteEntity.WriteType.INSERT_OVERWRITE == writeEntity.getWriteType()) {
- return LockType.EXCL_WRITE;
+ if (HiveTableOperations.hiveLockEnabled(hmsTable.getParameters(), conf)) {
+ throw new RuntimeException("Hive locking on table `" +
hmsTable.getFullTableName() +
+ "`cannot be enabled when `engine.hive.lock-enabled`=`true`. " +
+ "Disable `engine.hive.lock-enabled` to use Hive locking");
+ }
+ switch (writeEntity.getWriteType()) {
+ case INSERT_OVERWRITE:
+ return LockType.EXCL_WRITE;
+ case UPDATE:
+ case DELETE:
+ return sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE;
}
return LockType.SHARED_WRITE;
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
index aaf7138056d..832676df55b 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -130,6 +131,40 @@ public void testStatsWithInsert() {
checkColStatMinMaxValue(identifier.name(), "customer_id", 0, 5);
}
+ @Test
+ public void testStatsWithPessimisticLockInsert() {
+ Assume.assumeTrue(testTableType == TestTables.TestTableType.HIVE_CATALOG);
+ TableIdentifier identifier =
getTableIdentifierWithPessimisticLock("false");
+ String insert =
testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
identifier, false);
+ shell.executeStatement(insert);
+
+ checkColStat(identifier.name(), "customer_id", true);
+ checkColStatMinMaxValue(identifier.name(), "customer_id", 0, 2);
+ }
+
+ @Test
+ public void testStatsWithPessimisticLockInsertWhenHiveLockEnabled() {
+ Assume.assumeTrue(testTableType == TestTables.TestTableType.HIVE_CATALOG);
+ TableIdentifier identifier = getTableIdentifierWithPessimisticLock("true");
+ String insert =
testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
identifier, false);
+ AssertHelpers.assertThrows(
+ "Should throw RuntimeException when Hive locking is on with
'engine.hive.lock-enabled=true'",
+ RuntimeException.class,
+ () -> shell.executeStatement(insert)
+ );
+ }
+
+ private TableIdentifier getTableIdentifierWithPessimisticLock(String
hiveLockEnabled) {
+ TableIdentifier identifier = TableIdentifier.of("default", "customers");
+
+ shell.setHiveSessionValue(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER.varname,
true);
+
shell.setHiveSessionValue(HiveConf.ConfVars.HIVE_TXN_EXT_LOCKING_ENABLED.varname,
true);
+ testTables.createTable(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of(),
formatVersion,
+ ImmutableMap.of(TableProperties.HIVE_LOCK_ENABLED, hiveLockEnabled));
+ return identifier;
+ }
+
@Test
public void testStatsWithInsertOverwrite() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 266e0ef299e..2eaed2f1107 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -404,6 +404,7 @@ private IMetaStoreClient prepareParallelTest(String
tableName, int val)
throws Exception, MetaException, TException, NoSuchObjectException {
hiveConf.setBoolean("hive.stats.autogather", true);
hiveConf.setBoolean("hive.stats.column.autogather", true);
+ hiveConf.setBoolean("hive.txn.xlock.write", true);
// Need to close the thread local Hive object so that configuration change
is reflected to HMS.
Hive.closeCurrent();
runStatementOnDriver("drop table if exists " + tableName);