This is an automated email from the ASF dual-hosted git repository. difin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 279996b0858 HIVE-29028: Iceberg: Implement auto compaction (#5886) 279996b0858 is described below commit 279996b0858896a5b953745ec793c89d82378ea6 Author: Dmitriy Fingerman <dmitriy.finger...@gmail.com> AuthorDate: Wed Jul 2 23:12:10 2025 -0400 HIVE-29028: Iceberg: Implement auto compaction (#5886) --- .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 22 +- .../apache/iceberg/mr/hive/IcebergTableUtil.java | 10 + .../mr/hive/compaction/IcebergTableOptimizer.java | 191 +++++++++++++++++ .../hive/ql/txn/compactor/CompactorOnTezTest.java | 11 + .../txn/compactor/TestIcebergCompactorOnTez.java | 117 +++++++++-- .../hive/ql/txn/compactor/AcidTableOptimizer.java | 111 ++++++++++ .../hive/ql/txn/compactor/CompactionException.java | 6 + .../hive/ql/txn/compactor/CompactorThread.java | 32 --- .../hadoop/hive/ql/txn/compactor/Initiator.java | 226 +++++---------------- .../ql/txn/compactor/MetaStoreCompactorThread.java | 31 --- .../ql/txn/compactor/RemoteCompactorThread.java | 28 --- .../hive/ql/txn/compactor/TableOptimizer.java | 219 ++++++++++++++++++++ .../hadoop/hive/ql/TxnCommandsBaseForTests.java | 2 + .../hive/ql/txn/compactor/CompactorTest.java | 2 + .../hive/ql/txn/compactor/TestInitiator.java | 4 +- .../hadoop/hive/metastore/conf/MetastoreConf.java | 5 + 16 files changed, 712 insertions(+), 305 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index dc5ac290462..b4d9f0bea22 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -140,7 +140,6 @@ import org.apache.iceberg.FindFiles; import org.apache.iceberg.GenericBlobMetadata; import org.apache.iceberg.GenericStatisticsFile; -import org.apache.iceberg.ManifestFile; import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionField; @@ -459,7 +458,7 @@ public boolean supportsAppendData(org.apache.hadoop.hive.metastore.api.Table tab return true; } // If it is a table which has undergone partition evolution, return false; - if (hasUndergonePartitionEvolution(icebergTbl)) { + if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTbl)) { if (withPartClause) { throw new SemanticException("Can not Load into an iceberg table, which has undergone partition evolution " + "using the PARTITION clause"); @@ -1405,7 +1404,7 @@ public void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException { if (IcebergTableUtil.isBucketed(table)) { throw new SemanticException("Cannot perform insert overwrite query on bucket partitioned Iceberg table."); } - if (hasUndergonePartitionEvolution(table)) { + if (IcebergTableUtil.hasUndergonePartitionEvolution(table)) { throw new SemanticException( "Cannot perform insert overwrite query on Iceberg table where partition evolution happened. In order " + "to successfully carry out any insert overwrite operation on this table, the data has to be rewritten " + @@ -2061,7 +2060,7 @@ public void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable, private void validatePartSpecImpl(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec, List<PartitionField> partitionFields) throws SemanticException { Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - if (hmsTable.getSnapshotRef() != null && hasUndergonePartitionEvolution(table)) { + if (hmsTable.getSnapshotRef() != null && IcebergTableUtil.hasUndergonePartitionEvolution(table)) { // for this case we rewrite the query as delete query, so validations would be done as part of delete. return; } @@ -2108,7 +2107,7 @@ private void validatePartSpecImpl(org.apache.hadoop.hive.ql.metadata.Table hmsTa public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec) throws SemanticException { Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - if (MapUtils.isEmpty(partitionSpec) || !hasUndergonePartitionEvolution(table)) { + if (MapUtils.isEmpty(partitionSpec) || !IcebergTableUtil.hasUndergonePartitionEvolution(table)) { return true; } else if (hmsTable.getSnapshotRef() != null) { return false; @@ -2130,15 +2129,6 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable, return result; } - private boolean hasUndergonePartitionEvolution(Table table) { - // The current spec is not necessary the latest which can happen when partition spec was changed to one of - // table's past specs. - return table.currentSnapshot() != null && - table.currentSnapshot().allManifests(table.io()).parallelStream() - .map(ManifestFile::partitionSpecId) - .anyMatch(id -> id != table.spec().specId()); - } - @Override public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException { @@ -2159,7 +2149,7 @@ public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable); boolean readsNonCurrentSnapshot = snapshot != null && !snapshot.equals(table.currentSnapshot()); - if (readsNonCurrentSnapshot && hasUndergonePartitionEvolution(table)) { + if (readsNonCurrentSnapshot && IcebergTableUtil.hasUndergonePartitionEvolution(table)) { return false; } return table.spec().isPartitioned(); @@ -2368,7 +2358,7 @@ public void setMergeTaskDeleteProperties(TableDesc tableDesc) { @Override public boolean hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - return hasUndergonePartitionEvolution(table); + return IcebergTableUtil.hasUndergonePartitionEvolution(table); } private static List<FieldSchema> schema(List<VirtualColumn> exprs) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 54b283527a1..2e4e5f0a094 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -59,6 +59,7 @@ import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionData; @@ -562,4 +563,13 @@ public static ExecutorService newDeleteThreadPool(String completeName, int numTh return thread; }); } + + public static boolean hasUndergonePartitionEvolution(Table table) { + // The current spec is not necessary the latest which can happen when partition spec was changed to one of + // table's past specs. + return table.currentSnapshot() != null && + table.currentSnapshot().allManifests(table.io()).parallelStream() + .map(ManifestFile::partitionSpecId) + .anyMatch(id -> id != table.spec().specId()); + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java new file mode 100644 index 00000000000..ab785844461 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; +import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache; +import org.apache.hadoop.hive.ql.txn.compactor.TableOptimizer; +import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName; +import org.apache.iceberg.hive.RuntimeMetaException; +import org.apache.iceberg.mr.hive.IcebergTableUtil; +import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.thrift.TException; + +public class IcebergTableOptimizer extends TableOptimizer { + private HiveMetaStoreClient client; + private Map<String, Long> snapshotIdCache; + + public IcebergTableOptimizer(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache) throws MetaException { + super(conf, txnHandler, metadataCache); + init(); + } + + /** + * Scans all databases and tables in the Hive Metastore to identify Iceberg tables + * that are potential candidates for compaction. + * <p> + * The method filters tables based on provided databases and tables skip lists and a snapshot ID cache to avoid + * re-processing tables that haven't changed. For eligible Iceberg tables, it determines + * the appropriate compaction targets (table or specific partitions) and adds them to + * the {@link CompactionInfo} set. + * </p> + * @param lastChecked A timestamp of previous auto compaction's invocation. + * @param currentCompactions A {@link ShowCompactResponse} containing currently active or pending + * compaction requests, used to avoid duplicates. + * @param skipDBs A {@link Set} of database names to explicitly skip during the scan. + * @param skipTables A {@link Set} of fully qualified table names to explicitly skip during the scan. + * @return A {@link Set} of {@link CompactionInfo} objects representing tables and/or partitions + * identified as eligible for compaction. + * @throws MetaException If an unrecoverable error occurs during Metastore communication or + * during {@link SessionState} initialization. + */ + @Override + public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompactResponse currentCompactions, + Set<String> skipDBs, Set<String> skipTables) throws MetaException { + Set<CompactionInfo> compactionTargets = Sets.newHashSet(); + try { + SessionState sessionState = SessionState.get(); + if (sessionState == null) { + sessionState = new SessionState(conf); + SessionState.start(sessionState); + } + } catch (Exception e) { + throw new MetaException(String.format("Error while finding compaction targets for Iceberg tables: %s", + e.getMessage())); + } + + getDatabases().stream() + .filter(dbName -> !skipDBs.contains(dbName)) + .flatMap(dbName -> getTables(dbName).stream() + .map(tableName -> TableName.getDbTable(dbName, tableName))) + .filter(qualifiedTableName -> !skipTables.contains(qualifiedTableName)) + .map(qualifiedTableName -> Pair.of(qualifiedTableName, resolveMetastoreTable(qualifiedTableName))) + .filter(tablePair -> MetaStoreUtils.isIcebergTable(tablePair.getValue().getParameters())) + .filter(tablePair -> { + long currentSnapshotId = Long.parseLong(tablePair.getValue().getParameters().get("current-snapshot-id")); + Long cachedSnapshotId = snapshotIdCache.get(tablePair.getKey()); + return cachedSnapshotId == null || cachedSnapshotId != currentSnapshotId; + }) + .forEach(tablePair -> { + org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, tablePair.getValue()); + + if (icebergTable.spec().isPartitioned()) { + List<String> partitions = getPartitions(icebergTable); + + partitions.forEach(partition -> addCompactionTargetIfEligible(tablePair.getValue(), icebergTable, + partition, compactionTargets, currentCompactions, skipDBs, skipTables)); + } + + if (icebergTable.spec().isUnpartitioned() || IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable)) { + addCompactionTargetIfEligible(tablePair.getValue(), icebergTable, null, compactionTargets, + currentCompactions, skipDBs, skipTables); + } + + snapshotIdCache.put(tablePair.getKey(), icebergTable.currentSnapshot().snapshotId()); + }); + + return compactionTargets; + } + + private List<String> getDatabases() { + try { + return client.getAllDatabases(); + } catch (TException e) { + throw new RuntimeMetaException(e, "Error getting database names"); + } + } + + private List<String> getTables(String dbName) { + try { + return client.getAllTables(dbName); + } catch (TException e) { + throw new RuntimeMetaException(e, "Error getting table names of %s database", dbName); + } + } + + private List<String> getPartitions(org.apache.iceberg.Table icebergTable) { + try { + return IcebergTableUtil.getPartitionNames(icebergTable, Maps.newHashMap(), true); + } catch (SemanticException e) { + throw new RuntimeMetaException(e, "Error getting partition names for Iceberg table %s", icebergTable.name()); + } + } + + private Table resolveMetastoreTable(String qualifiedTableName) { + String[] dbTableName = TxnUtils.getDbTableName(qualifiedTableName); + try { + return metadataCache.computeIfAbsent(qualifiedTableName, + () -> CompactorUtil.resolveTable(conf, dbTableName[0], dbTableName[1])); + } catch (Exception e) { + throw new RuntimeMetaException(e, "Error resolving table %s", qualifiedTableName); + } + } + + public void init() throws MetaException { + client = new HiveMetaStoreClient(conf); + snapshotIdCache = Maps.newConcurrentMap(); + } + + private void addCompactionTargetIfEligible(Table table, org.apache.iceberg.Table icebergTable, String partitionName, + Set<CompactionInfo> compactions, ShowCompactResponse currentCompactions, Set<String> skipDBs, + Set<String> skipTables) { + + CompactionInfo ci = new CompactionInfo(table.getDbName(), table.getTableName(), partitionName, + CompactionType.SMART_OPTIMIZE); + + // Common Hive compaction eligibility checks + if (!isEligibleForCompaction(ci, currentCompactions, skipDBs, skipTables)) { + return; + } + + // Iceberg specific compaction checks: determine if compaction is needed and which type is needed + CompactionEvaluator compactionEvaluator; + try { + compactionEvaluator = new CompactionEvaluator(icebergTable, ci, table.getParameters()); + } catch (IOException e) { + throw new RuntimeMetaException(e, "Error construction compaction evaluator for table %s", table.getTableName()); + } + + if (!compactionEvaluator.isEligibleForCompaction()) { + return; + } + + ci.type = compactionEvaluator.determineCompactionType(); + compactions.add(ci); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java index fdbb454de41..57a7fc442e5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -69,6 +70,7 @@ public abstract class CompactorOnTezTest { protected IMetaStoreClient msClient; protected IDriver driver; protected boolean mmCompaction = false; + private final AtomicBoolean stop = new AtomicBoolean(); @ClassRule public static TemporaryFolder folder = new TemporaryFolder(); @@ -541,4 +543,13 @@ protected void dropTable(String tblName) throws Exception { executeStatementOnDriver("drop table " + tblName, driver); } } + + protected void runSingleInitiatorCycle() throws Exception { + TestTxnDbUtil.setConfValues(conf); + CompactorThread t = new Initiator(); + t.setConf(conf); + stop.set(true); + t.init(stop); + t.run(); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java index 9ca0bd9bbe4..f6cc76572f1 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java @@ -21,66 +21,139 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.txn.entities.CompactionState; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver; public class TestIcebergCompactorOnTez extends CompactorOnTezTest { + + private static final String DB_NAME = "default"; + private static final String TABLE_NAME = "ice_orc"; + private static final String QUALIFIED_TABLE_NAME = TxnUtils.getFullTableName(DB_NAME, TABLE_NAME); + + @Override + @Before + public void setup() throws Exception { + super.setup(); + executeStatementOnDriver("drop table if exists " + QUALIFIED_TABLE_NAME, driver); + } @Test public void testIcebergCompactorWithAllPartitionFieldTypes() throws Exception{ conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE, CUSTOM_COMPACTION_QUEUE); msClient = new HiveMetaStoreClient(conf); - String dbName = "default"; - String tableName = "ice_orc"; - String qualifiedTableName = dbName + "." + tableName; - - executeStatementOnDriver("drop table if exists " + qualifiedTableName, driver); executeStatementOnDriver(String.format("create table %s " + "(id int, a string, b int, c bigint, d float, e double, f decimal(4, 2), g boolean, h date, i date, j date, k timestamp) " + "partitioned by spec(a, truncate(3, a), bucket(4, a), b, c, d, e, f, g, h, year(h), month(i), day(j), k, hour(k)) stored by iceberg stored as orc " + - "tblproperties ('compactor.threshold.min.input.files'='1')", qualifiedTableName), driver); + "tblproperties ('compactor.threshold.min.input.files'='1')", QUALIFIED_TABLE_NAME), driver); // 6 records, one records per file --> 3 partitions, 2 files per partition - executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01', DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName), driver); - executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01', DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName), driver); - executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName), driver); - executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName), driver); - executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, null, null, null, null, null, null, null, null, null, null, null)", qualifiedTableName), driver); - executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, null, null, null, null, null, null, null, null, null, null, null)", qualifiedTableName), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01', DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01', DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, null, null, null, null, null, null, null, null, null, null, null)", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, null, null, null, null, null, null, null, null, null, null, null)", QUALIFIED_TABLE_NAME), driver); - Assert.assertEquals(6, getFilesCount(qualifiedTableName)); - List<String> recordsBefore = getAllRecords(qualifiedTableName); + Assert.assertEquals(6, getFilesCount()); + List<String> recordsBefore = getAllRecords(); - CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, false, + CompactorTestUtil.runCompaction(conf, DB_NAME, TABLE_NAME, CompactionType.MINOR, false, "a=aaa111/a_trunc=aaa/a_bucket=0/b=1/c=100/d=1.0/e=2.0/f=4.00/g=true/h=2024-05-01/h_year=2024/i_month=2024-05/j_day=2024-05-01/k=2024-05-02T10%3A00%3A00/k_hour=2024-05-02-10", "a=bbb222/a_trunc=bbb/a_bucket=3/b=2/c=200/d=2.0/e=3.0/f=8.00/g=false/h=2024-05-03/h_year=2024/i_month=2024-05/j_day=2024-05-03/k=2024-05-04T13%3A00%3A00/k_hour=2024-05-04-13", "a=null/a_trunc=null/a_bucket=null/b=null/c=null/d=null/e=null/f=null/g=null/h=null/h_year=null/i_month=null/j_day=null/k=null/k_hour=null" ); - Assert.assertEquals(3, getFilesCount(qualifiedTableName)); + Assert.assertEquals(3, getFilesCount()); verifySuccessfulCompaction(3); - List<String> recordsAfter = getAllRecords(qualifiedTableName); + List<String> recordsAfter = getAllRecords(); Assert.assertEquals(recordsBefore, recordsAfter); } - - private int getFilesCount(String qualifiedTableName) throws Exception { - driver.run(String.format("select count(*) from %s.files", qualifiedTableName)); + + @Test + public void testIcebergAutoCompactionPartitionEvolution() throws Exception { + executeStatementOnDriver(String.format("create table %s " + + "(id int, a string) " + + "partitioned by spec(id) stored by iceberg stored as orc " + + "tblproperties ('compactor.threshold.min.input.files'='1')", QUALIFIED_TABLE_NAME), driver); + + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 'a')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 'b')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 'c')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 'd')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, 'e')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, 'd')", QUALIFIED_TABLE_NAME), driver); + + executeStatementOnDriver(String.format("alter table %s set partition spec(truncate(3, a))", QUALIFIED_TABLE_NAME), driver); + + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (7, 'aaa111')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (8, 'aaa111')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9, 'bbb222')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10, 'bbb222')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)", QUALIFIED_TABLE_NAME), driver); + + runSingleInitiatorCycle(); + ShowCompactResponse rsp = msClient.showCompactions(); + Assert.assertEquals(4, rsp.getCompactsSize()); + + // Compaction should be initiated for each partition from the latest spec + Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=aaa", CompactionType.MINOR, CompactionState.INITIATED)); + Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=bbb", CompactionType.MINOR, CompactionState.INITIATED)); + Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=null", CompactionType.MINOR, CompactionState.INITIATED)); + + // Additional compaction should be initiated for all partitions from past partition specs + Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR, CompactionState.INITIATED)); + } + + @Test + public void testIcebergAutoCompactionUnpartitioned() throws Exception { + executeStatementOnDriver(String.format("create table %s " + + "(id int, a string) " + + "stored by iceberg stored as orc " + + "tblproperties ('compactor.threshold.min.input.files'='1')", QUALIFIED_TABLE_NAME), driver); + + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (7, 'aaa111')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (8, 'aaa111')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9, 'bbb222')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10, 'bbb222')", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)", QUALIFIED_TABLE_NAME), driver); + executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)", QUALIFIED_TABLE_NAME), driver); + + runSingleInitiatorCycle(); + ShowCompactResponse rsp = msClient.showCompactions(); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR, CompactionState.INITIATED)); + } + + private int getFilesCount() throws Exception { + driver.run(String.format("select count(*) from %s.files", QUALIFIED_TABLE_NAME)); List<String> res = new ArrayList<>(); driver.getFetchTask().fetch(res); return Integer.parseInt(res.get(0)); } - private List<String> getAllRecords(String qualifiedTableName) throws Exception { - driver.run(String.format("select * from %s order by id", qualifiedTableName)); + private List<String> getAllRecords() throws Exception { + driver.run(String.format("select * from %s order by id", QUALIFIED_TABLE_NAME)); List<String> res = new ArrayList<>(); driver.getFetchTask().fetch(res); return res; } + + private boolean isCompactExist(ShowCompactResponse rsp, String partName, CompactionType type, CompactionState state) { + return rsp.getCompacts().stream().anyMatch(c -> + c.getDbname().equals(DB_NAME) && c.getTablename().equals(TABLE_NAME) && + Objects.equals(c.getPartitionname(), partName) && c.getType().equals(type) && + c.getState().equals(state.name().toLowerCase())); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/AcidTableOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/AcidTableOptimizer.java new file mode 100644 index 00000000000..424f50a2763 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/AcidTableOptimizer.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class AcidTableOptimizer extends TableOptimizer { + private static final String CLASS_NAME = AcidTableOptimizer.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + public AcidTableOptimizer(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache) { + super(conf, txnHandler, metadataCache); + } + + @Override + public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompactResponse currentCompactions, + Set<String> skipDBs, Set<String> skipTables) throws MetaException { + + int abortedThreshold = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); + long abortedTimeThreshold = HiveConf + .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, + TimeUnit.MILLISECONDS); + + Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold, abortedTimeThreshold, lastChecked) + .parallelStream() + .filter(ci -> isEligibleForCompaction(ci, currentCompactions, skipDBs, skipTables)) + .collect(Collectors.toSet()); + + if (!potentials.isEmpty()) { + ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList( + txnHandler.getOpenTxns(), 0); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + } + + return potentials; + } + + @Override + protected boolean isEligibleForCompaction(CompactionInfo ci, ShowCompactResponse currentCompactions, + Set<String> skipDBs, Set<String> skipTables) { + try { + if (!super.isEligibleForCompaction(ci, currentCompactions, skipDBs, skipTables)) { + return false; + } + String qualifiedTableName = ci.getFullTableName(); + Table t = metadataCache.computeIfAbsent(qualifiedTableName, () -> + CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName)); + if (AcidUtils.isInsertOnlyTable(t.getParameters()) && !HiveConf + .getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) { + skipTables.add(ci.getFullTableName()); + LOG.info("Table {} is insert only and {}=false so we will not compact it.", qualifiedTableName, + HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM.varname); + return false; + } + if (isDynPartIngest(t, ci)) { + return false; + } + + } catch (Exception e) { + LOG.error("Caught exception while checking compaction eligibility.", e); + try { + ci.errorMessage = e.getMessage(); + txnHandler.markFailed(ci); + } catch (MetaException ex) { + LOG.error("Caught exception while marking compaction as failed.", e); + } + return false; + } + return true; + } + + // Check if it's a dynamic partitioning case. If so, do not initiate compaction for streaming ingest, only for aborts. + protected static boolean isDynPartIngest(Table t, CompactionInfo ci) { + if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty() && + ci.partName == null && !ci.hasOldAbort) { + LOG.info("Skipping entry for {} as it is from dynamic partitioning", ci.getFullTableName()); + return true; + } + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java index 302345f0284..498b3e55819 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java @@ -91,4 +91,10 @@ public ErrorMsg getCanonicalErrorMsg() { } public String getRemoteErrorMsg() { return remoteErrorMsg; } + + public CompactionException(Throwable throwable, String message, Object... args) { + super(String.format(message, args), throwable); + canonicalErrorMsg = ErrorMsg.GENERIC_ERROR; + remoteErrorMsg = null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index ec0619f7997..b3f5d2a7eeb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -25,17 +25,13 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -89,25 +85,6 @@ public void init(AtomicBoolean stop) throws Exception { */ abstract Table resolveTable(CompactionInfo ci) throws MetaException; - abstract boolean replIsCompactionDisabledForDatabase(String dbName) throws TException; - - /** - * Get list of partitions by name. - * @param ci compaction info. - * @return list of partitions - * @throws MetaException if an error occurs. - */ - abstract List<Partition> getPartitionsByNames(CompactionInfo ci) throws MetaException; - - /** - * Get the partition being compacted. - * @param ci compaction info returned from the compaction queue - * @return metastore partition, or null if there is not partition in this compaction info - * @throws MetaException if underlying calls throw, or if the partition name resolves to more than - * one partition. - */ - abstract protected Partition resolvePartition(CompactionInfo ci) throws MetaException; - protected String tableName(Table t) { return Warehouse.getQualifiedName(t); } @@ -123,15 +100,6 @@ public static void initializeAndStartThread(CompactorThread thread, Configuratio thread.start(); } - protected boolean replIsCompactionDisabledForTable(Table tbl) { - // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - boolean isCompactDisabled = ReplUtils.isFirstIncPending(tbl.getParameters()); - if (isCompactDisabled) { - LOG.info("Compaction is disabled for table " + tbl.getTableName()); - } - return isCompactDisabled; - } - @VisibleForTesting protected String getRuntimeVersion() { return this.getClass().getPackage().getImplementationVersion(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index f166d677e40..b72eea8ce43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import org.apache.hadoop.hive.common.ServerUtils; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -28,21 +27,20 @@ import org.apache.hadoop.hive.metastore.metrics.PerfLogger; import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.reflect.Constructor; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_INTIATOR_THREAD_NAME_FORMAT; @@ -51,13 +49,14 @@ * It's critical that there exactly 1 of these in a given warehouse. */ public class Initiator extends MetaStoreCompactorThread { - static final private String CLASS_NAME = Initiator.class.getName(); - static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + private static final String CLASS_NAME = Initiator.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private ExecutorService compactionExecutor; private boolean metricsEnabled; private boolean shouldUseMutex = true; + private List<TableOptimizer> optimizers; @Override public void run() { @@ -66,12 +65,6 @@ public void run() { // so wrap it in a big catch Throwable statement. try { recoverFailedCompactions(false); - - int abortedThreshold = HiveConf.getIntVar(conf, - HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); - long abortedTimeThreshold = HiveConf - .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, - TimeUnit.MILLISECONDS); TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); // Make sure we run through the loop once before checking to stop as this makes testing @@ -113,11 +106,11 @@ public void run() { Set<String> skipDBs = Sets.newConcurrentHashSet(); Set<String> skipTables = Sets.newConcurrentHashSet(); - Set<CompactionInfo> potentials = compactionExecutor.submit(() -> - txnHandler.findPotentialCompactions(abortedThreshold, abortedTimeThreshold, prevStart) - .parallelStream() - .filter(ci -> isEligibleForCompaction(ci, currentCompactions, skipDBs, skipTables)) - .collect(Collectors.toSet())).get(); + Set<CompactionInfo> potentials = Sets.newHashSet(); + for (TableOptimizer optimizer : optimizers) { + potentials.addAll(compactionExecutor.submit(() -> + optimizer.findPotentialCompactions(prevStart, currentCompactions, skipDBs, skipTables)).get()); + } LOG.debug("Found {} potential compactions, checking to see if we should compact any of them", potentials.size()); CompactorUtil.checkInterrupt(CLASS_NAME); @@ -125,12 +118,6 @@ public void run() { Map<String, String> tblNameOwners = new HashMap<>(); List<CompletableFuture<Void>> compactionList = new ArrayList<>(); - if (!potentials.isEmpty()) { - ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList( - txnHandler.getOpenTxns(), 0); - conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); - } - for (CompactionInfo ci : potentials) { try { //Check for interruption before scheduling each compactionInfo and return if necessary @@ -142,8 +129,8 @@ public void run() { ci.poolName = CompactorUtil.getPoolName(conf, t, metadataCache); Partition p = resolvePartition(ci); if (p == null && ci.partName != null) { - LOG.info("Can't find partition " + ci.getFullPartitionName() + - ", assuming it has been dropped and moving on."); + LOG.info("Can't find partition {}, assuming it has been dropped and moving on.", + ci.getFullPartitionName()); continue; } String runAs = resolveUserToRunAs(tblNameOwners, t, p); @@ -208,10 +195,6 @@ protected boolean isCacheEnabled() { MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON); } - private Database resolveDatabase(CompactionInfo ci) throws MetaException, NoSuchObjectException { - return CompactorUtil.resolveDatabase(conf, ci.dbname); - } - @VisibleForTesting protected String resolveUserToRunAs(Map<String, String> cache, Table t, Partition p) throws IOException, InterruptedException { @@ -236,158 +219,32 @@ public void init(AtomicBoolean stop) throws Exception { COMPACTOR_INTIATOR_THREAD_NAME_FORMAT); metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) && MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON); + optimizers = Arrays.stream(MetastoreConf.getTrimmedStringsVar(conf, + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS)) + .map(this::instantiateTableOptimizer).toList(); } + + private TableOptimizer instantiateTableOptimizer(String className) { + try { + Class<? extends TableOptimizer> icebergInitiatorClazz = (Class<? extends TableOptimizer>) + Class.forName(className, true, + Utilities.getSessionSpecifiedClassLoader()); - private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { - if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname()); - txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS)); - } - - private boolean foundCurrentOrFailedCompactions(ShowCompactResponse compactions, CompactionInfo ci) throws MetaException { - if (compactions.getCompacts() == null) { - return false; - } - - //In case of an aborted Dynamic partition insert, the created entry in the compaction queue does not contain - //a partition name even for partitioned tables. As a result it can happen that the ShowCompactResponse contains - //an element without partition name for partitioned tables. Therefore, it is necessary to null check the partition - //name of the ShowCompactResponseElement even if the CompactionInfo.partName is not null. These special compaction - //requests are skipped by the worker, and only cleaner will pick them up, so we should allow to schedule a 'normal' - //compaction for partitions of those tables which has special (DP abort) entry with undefined partition name. - List<ShowCompactResponseElement> filteredElements = compactions.getCompacts().stream() - .filter(e -> e.getDbname().equals(ci.dbname) - && e.getTablename().equals(ci.tableName) - && (e.getPartitionname() == null && ci.partName == null || - (Objects.equals(e.getPartitionname(),ci.partName)))) - .collect(Collectors.toList()); - - // Figure out if there are any currently running compactions on the same table or partition. - if (filteredElements.stream().anyMatch( - e -> TxnStore.WORKING_RESPONSE.equals(e.getState()) || TxnStore.INITIATED_RESPONSE.equals(e.getState()))) { - - LOG.info("Found currently initiated or working compaction for " + - ci.getFullPartitionName() + " so we will not initiate another compaction"); - return true; - } - - // Check if there is already sufficient number of consecutive failures for this table/partition - // so that no new automatic compactions needs to be scheduled. - int failedThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); - - LongSummaryStatistics failedStats = filteredElements.stream() - .filter(e -> TxnStore.SUCCEEDED_RESPONSE.equals(e.getState()) || TxnStore.FAILED_RESPONSE.equals(e.getState())) - .sorted(Comparator.comparingLong(ShowCompactResponseElement::getId).reversed()) - .limit(failedThreshold) - - .filter(e -> TxnStore.FAILED_RESPONSE.equals(e.getState())) - .collect(Collectors.summarizingLong(ShowCompactResponseElement::getEnqueueTime)); - - // If the last attempt was too long ago, ignore the failed threshold and try compaction again - long retryTime = MetastoreConf.getTimeVar(conf, - MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME, TimeUnit.MILLISECONDS); - - boolean needsRetry = (retryTime > 0) && (failedStats.getMax() + retryTime < System.currentTimeMillis()); - if (failedStats.getCount() == failedThreshold && !needsRetry) { - LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last " + - MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed."); - - ci.errorMessage = "Compaction is not initiated since last " + - MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " consecutive compaction attempts failed)"; + Class<?>[] constructorParameterTypes = {HiveConf.class, TxnStore.class, MetadataCache.class}; + Constructor<?> constructor = icebergInitiatorClazz.getConstructor(constructorParameterTypes); - txnHandler.markFailed(ci); - return true; + Object[] constructorArgs = new Object[] {conf, txnHandler, metadataCache}; + return (TableOptimizer) constructor.newInstance(constructorArgs); } - return false; - } - - // Check if it's a dynamic partitioning case. If so, do not initiate compaction for streaming ingest, only for aborts. - private static boolean isDynPartIngest(Table t, CompactionInfo ci){ - if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && - ci.partName == null && !ci.hasOldAbort) { - LOG.info("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + - " partitioning"); - return true; + catch (Exception e) { + throw new CompactionException(e, "Failed instantiating and calling table optimizer %s", className); } - return false; } - private boolean isEligibleForCompaction(CompactionInfo ci, - ShowCompactResponse currentCompactions, Set<String> skipDBs, Set<String> skipTables) { - try { - if (skipDBs.contains(ci.dbname)) { - LOG.info("Skipping {}::{}, skipDBs::size:{}", ci.dbname, ci.tableName, skipDBs.size()); - return false; - } else { - if (replIsCompactionDisabledForDatabase(ci.dbname)) { - skipDBs.add(ci.dbname); - LOG.info("Skipping {} as compaction is disabled due to repl; skipDBs::size:{}", - ci.dbname, skipDBs.size()); - return false; - } - } - - if (skipTables.contains(ci.getFullTableName())) { - return false; - } - - LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); - - // Check if we have already initiated or are working on a compaction for this table/partition. - // Also make sure we haven't exceeded configured number of consecutive failures. - // If any of the above applies, skip it. - // Note: if we are just waiting on cleaning we can still check, as it may be time to compact again even though we haven't cleaned. - if (foundCurrentOrFailedCompactions(currentCompactions, ci)) { - return false; - } - - Table t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci)); - if (t == null) { - LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + - "table or has been dropped and moving on."); - return false; - } - - if (replIsCompactionDisabledForTable(t)) { - skipTables.add(ci.getFullTableName()); - return false; - } - - Map<String, String> dbParams = metadataCache.computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters(); - if (MetaStoreUtils.isNoAutoCompactSet(dbParams, t.getParameters())) { - if (Boolean.parseBoolean(MetaStoreUtils.getNoAutoCompact(dbParams))) { - skipDBs.add(ci.dbname); - LOG.info("DB " + ci.dbname + " marked " + hive_metastoreConstants.NO_AUTO_COMPACT + - "=true so we will not compact it."); - } else { - skipTables.add(ci.getFullTableName()); - LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.NO_AUTO_COMPACT + - "=true so we will not compact it."); - } - return false; - } - if (AcidUtils.isInsertOnlyTable(t.getParameters()) && !HiveConf - .getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) { - skipTables.add(ci.getFullTableName()); - LOG.info("Table " + tableName(t) + " is insert only and " + HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM.varname - + "=false so we will not compact it."); - return false; - } - if (isDynPartIngest(t, ci)) { - return false; - } - - } catch (Throwable e) { - LOG.error("Caught exception while checking compaction eligibility.", e); - try { - ci.errorMessage = e.getMessage(); - txnHandler.markFailed(ci); - } catch (MetaException ex) { - LOG.error("Caught exception while marking compaction as failed.", e); - } - return false; - } - return true; + private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { + if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname()); + txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS)); } private static class InitiatorCycleUpdater implements Runnable { @@ -428,4 +285,25 @@ public void run() { public void enforceMutex(boolean enableMutex) { this.shouldUseMutex = enableMutex; } + + @VisibleForTesting + protected Partition resolvePartition(CompactionInfo ci) throws Exception { + Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> + CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName)); + + if (!MetaStoreUtils.isIcebergTable(table.getParameters())) { + return CompactorUtil.resolvePartition(conf, null, ci.dbname, ci.tableName, ci.partName, + CompactorUtil.METADATA_FETCH_MODE.LOCAL); + } else { + if (ci.partName == null) { + return null; + } + + org.apache.hadoop.hive.metastore.api.Partition partition = new org.apache.hadoop.hive.metastore.api.Partition(); + partition.setSd(table.getSd().deepCopy()); + partition.setParameters(com.google.common.collect.Maps.newHashMap()); + + return partition; + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java index 0878330cc3e..12662e3c6e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java @@ -20,19 +20,13 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.thrift.TException; -import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -40,7 +34,6 @@ import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf; import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_USE_CUSTOM_POOL; -import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; /** * Compactor threads that runs in the metastore. It uses a {@link TxnStore} @@ -69,30 +62,6 @@ public void init(AtomicBoolean stop) throws Exception { return CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName); } - @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException { - try { - Database database = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), dbName); - // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters()); - if (isReplCompactDisabled) { - LOG.info("Compaction is disabled for database " + dbName); - } - return isReplCompactDisabled; - } catch (NoSuchObjectException e) { - LOG.info("Unable to find database " + dbName); - return true; - } - } - - @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws MetaException { - return CompactorUtil.getPartitionsByNames(conf, ci.dbname, ci.tableName, ci.partName); - } - - protected Partition resolvePartition(CompactionInfo ci) throws MetaException { - return CompactorUtil.resolvePartition(conf, null, ci.dbname, ci.tableName, ci.partName, - CompactorUtil.METADATA_FETCH_MODE.LOCAL); - } - protected abstract boolean isCacheEnabled(); protected void startCycleUpdater(long updateInterval, Runnable taskToRun) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java index f95834ac23d..38493939b03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java @@ -19,20 +19,12 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.thrift.TException; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; - /** * Compactor thread that can run outside the metastore. It will * use the metastore thrift API which will default to a remote connection @@ -60,24 +52,4 @@ public void init(AtomicBoolean stop) throws Exception { @Override Table resolveTable(CompactionInfo ci) throws MetaException { return RemoteCompactorUtil.resolveTable(conf, msc, ci); } - - @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException { - try { - Database database = msc.getDatabase(getDefaultCatalog(conf), dbName); - // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - return ReplUtils.isFirstIncPending(database.getParameters()); - } catch (NoSuchObjectException e) { - LOG.info("Unable to find database " + dbName); - return true; - } - } - - @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws MetaException { - return RemoteCompactorUtil.getPartitionsByNames(msc, ci.dbname, ci.tableName, ci.tableName); - } - - protected Partition resolvePartition(CompactionInfo ci) throws MetaException { - return CompactorUtil.resolvePartition(conf, msc, ci.dbname, ci.tableName, ci.partName, - CompactorUtil.METADATA_FETCH_MODE.REMOTE); - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/TableOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/TableOptimizer.java new file mode 100644 index 00000000000..3211939ea4d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/TableOptimizer.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; + +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.List; +import java.util.LongSummaryStatistics; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; + +public abstract class TableOptimizer { + private static final String CLASS_NAME = TableOptimizer.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + public abstract Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompactResponse currentCompactions, + Set<String> skipDBs, Set<String> skipTables) throws MetaException; + + protected final HiveConf conf; + protected final TxnStore txnHandler; + protected final MetadataCache metadataCache; + + protected TableOptimizer(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache) { + this.conf = conf; + this.txnHandler = txnHandler; + this.metadataCache = metadataCache; + } + + protected boolean isEligibleForCompaction(CompactionInfo ci, ShowCompactResponse currentCompactions, + Set<String> skipDBs, Set<String> skipTables) { + try { + if (skipDBs.contains(ci.dbname)) { + LOG.info("Skipping {}::{}, skipDBs::size:{}", ci.dbname, ci.tableName, skipDBs.size()); + return false; + } else { + if (replIsCompactionDisabledForDatabase(ci.dbname)) { + skipDBs.add(ci.dbname); + LOG.info("Skipping {} as compaction is disabled due to repl; skipDBs::size:{}", + ci.dbname, skipDBs.size()); + return false; + } + } + + String qualifiedTableName = ci.getFullTableName(); + if (skipTables.contains(qualifiedTableName)) { + return false; + } + + LOG.info("Checking to see if we should compact {}", ci.getFullPartitionName()); + + // Check if we have already initiated or are working on a compaction for this table/partition. + // Also make sure we haven't exceeded configured number of consecutive failures. + // If any of the above applies, skip it. + // Note: if we are just waiting on cleaning we can still check, as it may be time to compact again even though we haven't cleaned. + if (foundCurrentOrFailedCompactions(currentCompactions, ci)) { + return false; + } + + Table t = metadataCache.computeIfAbsent(qualifiedTableName, () -> + CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName)); + if (t == null) { + LOG.info("Can't find table {}, assuming it's a temp table or has been dropped and moving on.", + qualifiedTableName); + return false; + } + + if (replIsCompactionDisabledForTable(t)) { + skipTables.add(qualifiedTableName); + return false; + } + + Map<String, String> dbParams = metadataCache.computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters(); + if (MetaStoreUtils.isNoAutoCompactSet(dbParams, t.getParameters())) { + if (Boolean.parseBoolean(MetaStoreUtils.getNoAutoCompact(dbParams))) { + skipDBs.add(ci.dbname); + LOG.info("DB {} marked {}=true so we will not compact it.", hive_metastoreConstants.NO_AUTO_COMPACT, ci.dbname); + } else { + skipTables.add(qualifiedTableName); + LOG.info("Table {} marked {}=true so we will not compact it.", hive_metastoreConstants.NO_AUTO_COMPACT, + qualifiedTableName); + } + return false; + } + } catch (Exception e) { + LOG.error("Caught exception while checking compaction eligibility.", e); + try { + ci.errorMessage = e.getMessage(); + txnHandler.markFailed(ci); + } catch (MetaException ex) { + LOG.error("Caught exception while marking compaction as failed.", e); + } + return false; + } + return true; + } + + private boolean replIsCompactionDisabledForTable(Table tbl) { + // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. + boolean isCompactDisabled = ReplUtils.isFirstIncPending(tbl.getParameters()); + if (isCompactDisabled) { + LOG.info("Compaction is disabled for table {}", tbl.getTableName()); + } + return isCompactDisabled; + } + + private boolean replIsCompactionDisabledForDatabase(String dbName) throws TException { + try { + Database database = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), dbName); + // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. + boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters()); + if (isReplCompactDisabled) { + LOG.info("Compaction is disabled for database {}", dbName); + } + return isReplCompactDisabled; + } catch (NoSuchObjectException e) { + LOG.info("Unable to find database {}", dbName); + return true; + } + } + + protected boolean foundCurrentOrFailedCompactions(ShowCompactResponse compactions, CompactionInfo ci) throws MetaException { + if (compactions.getCompacts() == null) { + return false; + } + + //In case of an aborted Dynamic partition insert, the created entry in the compaction queue does not contain + //a partition name even for partitioned tables. As a result it can happen that the ShowCompactResponse contains + //an element without partition name for partitioned tables. Therefore, it is necessary to null check the partition + //name of the ShowCompactResponseElement even if the CompactionInfo.partName is not null. These special compaction + //requests are skipped by the worker, and only cleaner will pick them up, so we should allow to schedule a 'normal' + //compaction for partitions of those tables which has special (DP abort) entry with undefined partition name. + List<ShowCompactResponseElement> filteredElements = compactions.getCompacts().stream() + .filter(e -> e.getDbname().equals(ci.dbname) + && e.getTablename().equals(ci.tableName) + && (e.getPartitionname() == null && ci.partName == null || + (Objects.equals(e.getPartitionname(),ci.partName)))) + .toList(); + + // Figure out if there are any currently running compactions on the same table or partition. + if (filteredElements.stream().anyMatch( + e -> TxnStore.WORKING_RESPONSE.equals(e.getState()) || TxnStore.INITIATED_RESPONSE.equals(e.getState()))) { + + LOG.info("Found currently initiated or working compaction for {} so we will not initiate another compaction", + ci.getFullPartitionName()); + return true; + } + + // Check if there is already sufficient number of consecutive failures for this table/partition + // so that no new automatic compactions needs to be scheduled. + int failedThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); + + LongSummaryStatistics failedStats = filteredElements.stream() + .filter(e -> TxnStore.SUCCEEDED_RESPONSE.equals(e.getState()) || TxnStore.FAILED_RESPONSE.equals(e.getState())) + .sorted(Comparator.comparingLong(ShowCompactResponseElement::getId).reversed()) + .limit(failedThreshold) + + .filter(e -> TxnStore.FAILED_RESPONSE.equals(e.getState())) + .collect(Collectors.summarizingLong(ShowCompactResponseElement::getEnqueueTime)); + + // If the last attempt was too long ago, ignore the failed threshold and try compaction again + long retryTime = MetastoreConf.getTimeVar(conf, + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME, TimeUnit.MILLISECONDS); + + boolean needsRetry = (retryTime > 0) && (failedStats.getMax() + retryTime < System.currentTimeMillis()); + if (failedStats.getCount() == failedThreshold && !needsRetry) { + LOG.warn("Will not initiate compaction for {} since last {} attempts to compact it failed.", + ci.getFullPartitionName(), MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); + + ci.errorMessage = "Compaction is not initiated since last " + + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " consecutive compaction attempts failed)"; + + txnHandler.markFailed(ci); + return true; + } + return false; + } + + protected Database resolveDatabase(CompactionInfo ci) throws MetaException, NoSuchObjectException { + return CompactorUtil.resolveDatabase(conf, ci.dbname); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 21f7ddfadf3..fe10a762bf9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -123,6 +123,8 @@ void initHiveConf() { hiveConf.set("tez.grouping.max-size", "10"); hiveConf.set("tez.grouping.min-size", "1"); databaseProduct = determineDatabaseProduct(DatabaseProduct.DERBY_NAME, hiveConf); + MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS, + "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer"); } void setUpInternal() throws Exception { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 38484534b77..ec2f5dacd75 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -142,6 +142,8 @@ protected final void setup(HiveConf conf) throws Exception { MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, useMinHistoryWriteId()); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS, + "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer"); // Set this config to true in the base class, there are extended test classes which set this config to false. MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true); TestTxnDbUtil.setConfValues(conf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index c98b310ca25..636550aeaca 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -1001,7 +1001,7 @@ public void resolveUserToRunAs() throws Exception { initiator.setConf(conf); initiator.init(new AtomicBoolean(true)); doThrow(new RuntimeException("This was thrown on purpose by testInitiatorFailure")) - .when(initiator).resolveTable(any()); + .when(initiator).resolvePartition(any()); initiator.run(); // verify status of table compaction @@ -1134,7 +1134,7 @@ public void testMetaCache() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List<ShowCompactResponseElement> compacts = rsp.getCompacts(); Assert.assertEquals(2, compacts.size()); - Mockito.verify(initiator, times(1)).resolveTable(Mockito.any()); + Mockito.verify(initiator, times(2)).resolvePartition(Mockito.any()); } private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 9cf5361c4f8..3627517b441 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -653,6 +653,11 @@ public enum ConfVars { COMPACTOR_INITIATOR_TABLECACHE_ON("metastore.compactor.initiator.tablecache.on", "hive.compactor.initiator.tablecache.on", true, "Enable table caching in the initiator. Currently the cache is cleaned after each cycle."), + COMPACTOR_INITIATOR_TABLE_OPTIMIZERS("compactor.table.optimizers", + "hive.compactor.table.optimizers", + "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer," + + "org.apache.iceberg.mr.hive.compaction.IcebergTableOptimizer", + "Comma separated list of table optimizers executed by compaction Initiator."), COMPACTOR_WORKER_THREADS("metastore.compactor.worker.threads", "hive.compactor.worker.threads", 0, "How many compactor worker threads to run on this metastore instance. Set this to a\n" +