This is an automated email from the ASF dual-hosted git repository. difin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 2495898ae93 HIVE-28644: Iceberg: Add support for SMART OPTIMIZE feature (#5621) 2495898ae93 is described below commit 2495898ae937de2bfd8fe72c63eec2ae905c908c Author: Dmitriy Fingerman <dmitriy.finger...@gmail.com> AuthorDate: Mon May 19 09:17:41 2025 -0400 HIVE-28644: Iceberg: Add support for SMART OPTIMIZE feature (#5621) --- .../hive/compaction/IcebergCompactionService.java | 22 ++- .../compaction/evaluator/CompactionEvaluator.java | 17 +++ ..._major_compaction_partition_evolution_ordered.q | 5 +- ...or_compaction_partition_evolution_ordered.q.out | 17 +-- .../iceberg_major_compaction_unpartitioned.q.out | 3 +- ...rg_major_compaction_unpartitioned_ordered.q.out | 3 +- .../hadoop/hive/ql/parse/AlterClauseParser.g | 2 +- .../hive/ql/parse/TestParseOptimizeTable.java | 9 +- .../metastore/txn/TestCompactionTxnHandler.java | 18 ++- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 8 +- .../src/gen/thrift/gen-cpp/hive_metastore_types.h | 3 +- .../hadoop/hive/metastore/api/CompactionType.java | 5 +- .../thrift/gen-php/metastore/CompactionType.php | 3 + .../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 3 + .../src/gen/thrift/gen-rb/hive_metastore_types.rb | 5 +- .../src/main/thrift/hive_metastore.thrift | 1 + .../hive/metastore/txn/CompactionTxnHandler.java | 12 +- .../apache/hadoop/hive/metastore/txn/TxnStore.java | 1 + .../apache/hadoop/hive/metastore/txn/TxnUtils.java | 4 + .../metastore/txn/entities/CompactionInfo.java | 5 + .../src/main/sql/hive/hive-schema-4.1.0.hive.sql | 2 +- .../main/sql/hive/upgrade-4.0.0-to-4.1.0.hive.sql | 161 +++++++++++++++++++++ 22 files changed, 263 insertions(+), 46 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java index 32913882fc3..42d395eb311 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java @@ -18,6 +18,7 @@ package org.apache.iceberg.mr.hive.compaction; +import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; @@ -40,15 +41,10 @@ public IcebergCompactionService() { public Boolean compact(Table table, CompactionInfo ci) throws Exception { - if (!ci.isMajorCompaction() && !ci.isMinorCompaction()) { - ci.errorMessage = String.format( - "Iceberg tables do not support %s compaction type, supported types are ['MINOR', 'MAJOR']", ci.type.name()); + if (!ci.isMajorCompaction() && !ci.isMinorCompaction() && !ci.isSmartOptimize()) { + ci.errorMessage = String.format("Iceberg tables do not support %s compaction type", ci.type.name()); LOG.error(ci.errorMessage + " Compaction info: {}", ci); - try { - msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)); - } catch (Throwable tr) { - LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, tr); - } + msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)); return false; } CompactorUtil.checkInterrupt(CLASS_NAME); @@ -63,6 +59,16 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { return false; } + if (ci.type == CompactionType.SMART_OPTIMIZE) { + CompactionType type = compactionEvaluator.determineCompactionType(); + if (type == null) { + msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)); + return false; + } + ci.type = type; + } + msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), 0); + if (ci.runAs == null) { ci.runAs = TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/CompactionEvaluator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/CompactionEvaluator.java index ddbcdd509e5..4fd1e076a3b 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/CompactionEvaluator.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/CompactionEvaluator.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; +import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext; import org.apache.iceberg.DataFile; @@ -89,11 +90,27 @@ public boolean isEligibleForCompaction() { return isMinorNecessary(); case MAJOR: return isMajorNecessary(); + case SMART_OPTIMIZE: + return isMinorNecessary() || isMajorNecessary(); default: return false; } } + public CompactionType determineCompactionType() { + if (ci.type == CompactionType.SMART_OPTIMIZE) { + if (isMajorNecessary()) { + return CompactionType.MAJOR; + } else if (isMinorNecessary()) { + return CompactionType.MINOR; + } else { + return null; + } + } else { + return ci.type; + } + } + private static TableRuntime createTableRuntime(Table icebergTable, Map<String, String> parameters) { OptimizingConfig optimizingConfig = OptimizingConfig.parse(Collections.emptyMap()); optimizingConfig.setTargetSize(getTargetSizeBytes(parameters)); diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_ordered.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_ordered.q index 8c15ed861f6..0dfe2814042 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_ordered.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_ordered.q @@ -12,6 +12,7 @@ --! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ --! qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +--! qt:replace:/(SMART_OPTIMIZE\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ -- Mask removed file size @@ -60,10 +61,10 @@ select * from ice_orc where dept_id = 2 order by first_name; select * from ice_orc where dept_id = 3 order by first_name; describe formatted ice_orc; -explain alter table ice_orc COMPACT 'major' and wait order by first_name desc; +explain alter table ice_orc COMPACT 'smart_optimize' and wait order by first_name desc; explain optimize table ice_orc rewrite data order by first_name desc; -alter table ice_orc COMPACT 'major' and wait order by first_name desc; +alter table ice_orc COMPACT 'smart_optimize' and wait order by first_name desc; select * from ice_orc where company_id = 100; select * from ice_orc where dept_id = 2; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out index c1e7ef38431..a4d270f0aa2 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out @@ -181,11 +181,11 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat Compressed: No Sort Columns: [] -PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait order by first_name desc +PREHOOK: query: explain alter table ice_orc COMPACT 'smart_optimize' and wait order by first_name desc PREHOOK: type: ALTERTABLE_COMPACT PREHOOK: Input: default@ice_orc PREHOOK: Output: default@ice_orc -POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait order by first_name desc +POSTHOOK: query: explain alter table ice_orc COMPACT 'smart_optimize' and wait order by first_name desc POSTHOOK: type: ALTERTABLE_COMPACT POSTHOOK: Input: default@ice_orc POSTHOOK: Output: default@ice_orc @@ -195,7 +195,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Compact - compaction type: major + compaction type: smart_optimize table name: default.ice_orc numberOfBuckets: 0 order by: order by first_name desc @@ -216,18 +216,17 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Compact - compaction type: major + compaction type: smart_optimize table name: default.ice_orc numberOfBuckets: 0 order by: order by first_name desc table name: default.ice_orc - blocking: true -PREHOOK: query: alter table ice_orc COMPACT 'major' and wait order by first_name desc +PREHOOK: query: alter table ice_orc COMPACT 'smart_optimize' and wait order by first_name desc PREHOOK: type: ALTERTABLE_COMPACT PREHOOK: Input: default@ice_orc PREHOOK: Output: default@ice_orc -POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait order by first_name desc +POSTHOOK: query: alter table ice_orc COMPACT 'smart_optimize' and wait order by first_name desc POSTHOOK: type: ALTERTABLE_COMPACT POSTHOOK: Input: default@ice_orc POSTHOOK: Output: default@ice_orc @@ -333,5 +332,5 @@ POSTHOOK: query: show compactions order by 'partition' POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId #Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc dept_id=3 MAJOR refused #Masked# manual default 0 0 0 --- -#Masked# default ice_orc --- MAJOR refused #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=3 SMART_OPTIMIZE refused #Masked# manual default 0 0 0 --- +#Masked# default ice_orc --- SMART_OPTIMIZE refused #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out index e65eb47fb09..0d7ed2d6113 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out @@ -248,12 +248,11 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Compact - compaction type: major + compaction type: smart_optimize table name: default.ice_orc numberOfBuckets: 0 pool: iceberg table name: default.ice_orc - blocking: true PREHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg' PREHOOK: type: ALTERTABLE_COMPACT diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out index 41f11451853..81ccad496e3 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out @@ -145,12 +145,11 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Compact - compaction type: major + compaction type: smart_optimize table name: default.ice_orc numberOfBuckets: 0 order by: order by last_name desc table name: default.ice_orc - blocking: true PREHOOK: query: alter table ice_orc COMPACT 'major' and wait order by last_name desc PREHOOK: type: ALTERTABLE_COMPACT diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g index 4f37ce7bca5..67bd88d9b75 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g @@ -117,7 +117,7 @@ optimizeTblRewriteDataSuffix @init { gParent.msgs.push("compaction request"); } @after { gParent.msgs.pop(); } : KW_REWRITE KW_DATA compactPool? whereClause? orderByClause? - -> ^(TOK_ALTERTABLE_COMPACT Identifier["'MAJOR'"] TOK_BLOCKING compactPool? whereClause? orderByClause?) + -> ^(TOK_ALTERTABLE_COMPACT Identifier["'SMART_OPTIMIZE'"] compactPool? whereClause? orderByClause?) ; alterStatementPartitionKeyType diff --git a/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseOptimizeTable.java b/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseOptimizeTable.java index db5c2a06070..0aba86c9909 100644 --- a/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseOptimizeTable.java +++ b/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseOptimizeTable.java @@ -34,8 +34,7 @@ public void testOptimizeTableWithWhere() throws Exception { " TOK_TABNAME\n" + " tbl0\n" + " TOK_ALTERTABLE_COMPACT\n" + - " 'MAJOR'\n" + - " TOK_BLOCKING\n" + + " 'SMART_OPTIMIZE'\n" + " TOK_WHERE\n" + " and\n" + " TOK_FUNCTION\n" + @@ -64,8 +63,7 @@ public void testOptimizeTableWithOrderBy() throws Exception { " TOK_TABNAME\n" + " tbl0\n" + " TOK_ALTERTABLE_COMPACT\n" + - " 'MAJOR'\n" + - " TOK_BLOCKING\n" + + " 'SMART_OPTIMIZE'\n" + " TOK_ORDERBY\n" + " TOK_TABSORTCOLNAMEDESC\n" + " TOK_NULLS_FIRST\n" + @@ -87,8 +85,7 @@ public void testOptimizeTableWithPool() throws Exception { " TOK_TABNAME\n" + " tbl0\n" + " TOK_ALTERTABLE_COMPACT\n" + - " 'MAJOR'\n" + - " TOK_BLOCKING\n" + + " 'SMART_OPTIMIZE'\n" + " TOK_COMPACT_POOL\n" + " 'iceberg'\n" + " <EOF>\n"; diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index d26f3774af7..11040823495 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -204,7 +204,9 @@ public void testFindNextToClean() throws Exception { assertNotNull(ci); ci.highestWriteId = 41; - txnHandler.updateCompactorState(ci, 0); + long txnid = openTxn(); + ci.txnId = txnid; + txnHandler.updateCompactorState(ci, txnid); txnHandler.markCompacted(ci); assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); @@ -234,7 +236,9 @@ public void testMarkCleaned() throws Exception { assertNotNull(ci); ci.highestWriteId = 41; - txnHandler.updateCompactorState(ci, 0); + long txnid = openTxn(); + ci.txnId = txnid; + txnHandler.updateCompactorState(ci, txnid); txnHandler.markCompacted(ci); assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); @@ -857,7 +861,8 @@ public void testMarkCleanedCleansTxnsAndTxnComponents() assertNotNull(ci); ci.highestWriteId = mytableWriteId; - txnHandler.updateCompactorState(ci, 0); + ci.txnId = 1; + txnHandler.updateCompactorState(ci, 1); txnHandler.markCompacted(ci); Thread.sleep(txnHandler.getOpenTxnTimeOutMillis()); @@ -884,7 +889,8 @@ public void testMarkCleanedCleansTxnsAndTxnComponents() assertNotNull(ci); ci.highestWriteId = fooWriteId; - txnHandler.updateCompactorState(ci, 0); + txnHandler.updateCompactorState(ci, 2); + ci.txnId = 2; txnHandler.markCompacted(ci); toClean = txnHandler.findReadyToClean(0, 0); @@ -1101,7 +1107,9 @@ private void createAReadyToCleanCompaction(String dbName, String tableName, Stri assertNotNull(ci); ci.highestWriteId = 41; - txnHandler.updateCompactorState(ci, 0); + long txnId = openTxn(); + ci.txnId = txnId; + txnHandler.updateCompactorState(ci, txnId); txnHandler.markCompacted(ci); } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 4995120de8f..604db9179f0 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -240,15 +240,17 @@ int _kCompactionTypeValues[] = { CompactionType::MINOR, CompactionType::MAJOR, CompactionType::REBALANCE, - CompactionType::ABORT_TXN_CLEANUP + CompactionType::ABORT_TXN_CLEANUP, + CompactionType::SMART_OPTIMIZE }; const char* _kCompactionTypeNames[] = { "MINOR", "MAJOR", "REBALANCE", - "ABORT_TXN_CLEANUP" + "ABORT_TXN_CLEANUP", + "SMART_OPTIMIZE" }; -const std::map<int, const char*> _CompactionType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kCompactionTypeValues, _kCompactionTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); +const std::map<int, const char*> _CompactionType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(5, _kCompactionTypeValues, _kCompactionTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); std::ostream& operator<<(std::ostream& out, const CompactionType::type& val) { std::map<int, const char*>::const_iterator it = _CompactionType_VALUES_TO_NAMES.find(val); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h index 01e39aee48f..08860167e50 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -128,7 +128,8 @@ struct CompactionType { MINOR = 1, MAJOR = 2, REBALANCE = 3, - ABORT_TXN_CLEANUP = 4 + ABORT_TXN_CLEANUP = 4, + SMART_OPTIMIZE = 5 }; }; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java index 0dabc2868c7..3394aca64e0 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java @@ -12,7 +12,8 @@ public enum CompactionType implements org.apache.thrift.TEnum { MINOR(1), MAJOR(2), REBALANCE(3), - ABORT_TXN_CLEANUP(4); + ABORT_TXN_CLEANUP(4), + SMART_OPTIMIZE(5); private final int value; @@ -42,6 +43,8 @@ public static CompactionType findByValue(int value) { return REBALANCE; case 4: return ABORT_TXN_CLEANUP; + case 5: + return SMART_OPTIMIZE; default: return null; } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionType.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionType.php index 083d81ee061..45982da0e8b 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionType.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionType.php @@ -26,11 +26,14 @@ final class CompactionType const ABORT_TXN_CLEANUP = 4; + const SMART_OPTIMIZE = 5; + static public $__names = array( 1 => 'MINOR', 2 => 'MAJOR', 3 => 'REBALANCE', 4 => 'ABORT_TXN_CLEANUP', + 5 => 'SMART_OPTIMIZE', ); } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index c2cae0abb72..9e0ad04d65d 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -157,12 +157,14 @@ class CompactionType(object): MAJOR = 2 REBALANCE = 3 ABORT_TXN_CLEANUP = 4 + SMART_OPTIMIZE = 5 _VALUES_TO_NAMES = { 1: "MINOR", 2: "MAJOR", 3: "REBALANCE", 4: "ABORT_TXN_CLEANUP", + 5: "SMART_OPTIMIZE", } _NAMES_TO_VALUES = { @@ -170,6 +172,7 @@ class CompactionType(object): "MAJOR": 2, "REBALANCE": 3, "ABORT_TXN_CLEANUP": 4, + "SMART_OPTIMIZE": 5, } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 83dbc802a78..e2e396b690c 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -72,8 +72,9 @@ module CompactionType MAJOR = 2 REBALANCE = 3 ABORT_TXN_CLEANUP = 4 - VALUE_MAP = {1 => "MINOR", 2 => "MAJOR", 3 => "REBALANCE", 4 => "ABORT_TXN_CLEANUP"} - VALID_VALUES = Set.new([MINOR, MAJOR, REBALANCE, ABORT_TXN_CLEANUP]).freeze + SMART_OPTIMIZE = 5 + VALUE_MAP = {1 => "MINOR", 2 => "MAJOR", 3 => "REBALANCE", 4 => "ABORT_TXN_CLEANUP", 5 => "SMART_OPTIMIZE"} + VALID_VALUES = Set.new([MINOR, MAJOR, REBALANCE, ABORT_TXN_CLEANUP, SMART_OPTIMIZE]).freeze end module GrantRevokeType diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index b342d69d2f0..06bf2b01841 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -219,6 +219,7 @@ enum CompactionType { MAJOR = 2, REBALANCE = 3, ABORT_TXN_CLEANUP = 4, + SMART_OPTIMIZE = 5, } enum GrantRevokeType { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index d3b6091574a..91cfe313033 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -317,16 +317,22 @@ public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException @Override public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws MetaException { + boolean runInTxn = compactionTxnId > 0; jdbcResource.execute( "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HIGHEST_WRITE_ID\" = :highestWriteId, " + - "\"CQ_RUN_AS\" = :runAs, \"CQ_TXN_ID\" = :txnId WHERE \"CQ_ID\" = :id", + "\"CQ_RUN_AS\" = :runAs, \"CQ_TXN_ID\" = :txnId, \"CQ_TYPE\" = :type WHERE \"CQ_ID\" = :id", new MapSqlParameterSource() - .addValue("highestWriteId", ci.highestWriteId) + .addValue("highestWriteId", runInTxn ? ci.highestWriteId : null, Types.BIGINT) .addValue("runAs", ci.runAs) - .addValue("txnId", compactionTxnId) + .addValue("txnId", runInTxn ? compactionTxnId : null, Types.BIGINT) + .addValue("type", Character.toString(thriftCompactionType2DbType(ci.type))) .addValue("id", ci.id), ParameterizedCommand.EXACTLY_ONE_ROW); + if (!runInTxn) { + return; + } + MapSqlParameterSource parameterSource = new MapSqlParameterSource() .addValue("txnId", compactionTxnId) .addValue("dbName", ci.dbname) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 2cce4966cd1..d5349e6c706 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -132,6 +132,7 @@ enum MUTEX_KEY { char MINOR_TYPE = 'i'; char REBALANCE_TYPE = 'r'; char ABORT_TXN_CLEANUP_TYPE = 'c'; + char SMART_OPTIMIZE_TYPE = '*'; String[] COMPACTION_STATES = new String[] {INITIATED_RESPONSE, WORKING_RESPONSE, CLEANING_RESPONSE, FAILED_RESPONSE, SUCCEEDED_RESPONSE, DID_NOT_INITIATE_RESPONSE, REFUSED_RESPONSE }; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index f490798be56..d98afb7376a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -595,6 +595,8 @@ public static CompactionType dbCompactionType2ThriftType(char dbValue) throws SQ return CompactionType.REBALANCE; case TxnStore.ABORT_TXN_CLEANUP_TYPE: return CompactionType.ABORT_TXN_CLEANUP; + case TxnStore.SMART_OPTIMIZE_TYPE: + return CompactionType.SMART_OPTIMIZE; default: throw new SQLException("Unexpected compaction type " + dbValue); } @@ -610,6 +612,8 @@ public static Character thriftCompactionType2DbType(CompactionType ct) throws Me return TxnStore.REBALANCE_TYPE; case ABORT_TXN_CLEANUP: return TxnStore.ABORT_TXN_CLEANUP_TYPE; + case SMART_OPTIMIZE: + return TxnStore.SMART_OPTIMIZE_TYPE; default: throw new MetaException("Unexpected compaction type " + ct); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/entities/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/entities/CompactionInfo.java index 4d24f1d23ed..e1c1f492bab 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/entities/CompactionInfo.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/entities/CompactionInfo.java @@ -149,6 +149,10 @@ public boolean isMinorCompaction() { return CompactionType.MINOR == type; } + public boolean isSmartOptimize() { + return CompactionType.SMART_OPTIMIZE == type; + } + public boolean isRebalanceCompaction() { return CompactionType.REBALANCE == type; } @@ -329,6 +333,7 @@ public static CompactionInfoStruct compactionInfoToStruct(CompactionInfo ci) { cr.setHasoldabort(ci.hasOldAbort); cr.setStart(ci.start); cr.setState(Character.toString(ci.state)); + cr.setType(ci.type); cr.setWorkerId(ci.workerId); cr.setHighestWriteId(ci.highestWriteId); cr.setErrorMessage(ci.errorMessage); diff --git a/standalone-metastore/metastore-server/src/main/sql/hive/hive-schema-4.1.0.hive.sql b/standalone-metastore/metastore-server/src/main/sql/hive/hive-schema-4.1.0.hive.sql index 4049ce32d53..0b29816de26 100644 --- a/standalone-metastore/metastore-server/src/main/sql/hive/hive-schema-4.1.0.hive.sql +++ b/standalone-metastore/metastore-server/src/main/sql/hive/hive-schema-4.1.0.hive.sql @@ -1215,7 +1215,7 @@ SELECT CC_DATABASE, CC_TABLE, CC_PARTITION, - CASE WHEN CC_TYPE = 'i' THEN 'minor' WHEN CC_TYPE = 'a' THEN 'major' ELSE 'UNKNOWN' END, + CASE WHEN CC_TYPE = 'i' THEN 'minor' WHEN CC_TYPE = 'a' THEN 'major' WHEN CC_TYPE = '*' THEN 'smart-optimize' ELSE 'UNKNOWN' END, CASE WHEN CC_STATE = 'f' THEN 'failed' WHEN CC_STATE = 's' THEN 'succeeded' WHEN CC_STATE = 'a' THEN 'did not initiate' WHEN CC_STATE = 'c' THEN 'refused' ELSE 'UNKNOWN' END, CASE WHEN CC_WORKER_ID IS NULL THEN cast (null as string) ELSE split(CC_WORKER_ID,"-")[0] END, diff --git a/standalone-metastore/metastore-server/src/main/sql/hive/upgrade-4.0.0-to-4.1.0.hive.sql b/standalone-metastore/metastore-server/src/main/sql/hive/upgrade-4.0.0-to-4.1.0.hive.sql index 1d42ffb242b..c7ccfdc2a31 100644 --- a/standalone-metastore/metastore-server/src/main/sql/hive/upgrade-4.0.0-to-4.1.0.hive.sql +++ b/standalone-metastore/metastore-server/src/main/sql/hive/upgrade-4.0.0-to-4.1.0.hive.sql @@ -226,5 +226,166 @@ TBLPROPERTIES ( CREATE OR REPLACE VIEW `VERSION` AS SELECT 1 AS `VER_ID`, '4.1.0' AS `SCHEMA_VERSION`, 'Hive release version 4.1.0' AS `VERSION_COMMENT`; + +CREATE OR REPLACE VIEW `COMPACTIONS` +( + `C_ID`, + `C_CATALOG`, + `C_DATABASE`, + `C_TABLE`, + `C_PARTITION`, + `C_TYPE`, + `C_STATE`, + `C_WORKER_HOST`, + `C_WORKER_ID`, + `C_WORKER_VERSION`, + `C_ENQUEUE_TIME`, + `C_START`, + `C_DURATION`, + `C_HADOOP_JOB_ID`, + `C_RUN_AS`, + `C_ERROR_MESSAGE`, + `C_NEXT_TXN_ID`, + `C_TXN_ID`, + `C_COMMIT_TIME`, + `C_HIGHEST_WRITE_ID`, + `C_INITIATOR_HOST`, + `C_INITIATOR_ID`, + `C_INITIATOR_VERSION`, + `C_CLEANER_START`, + `C_POOL_NAME`, + `C_NUMBER_OF_BUCKETS`, + `C_TBLPROPERTIES` +) AS +SELECT + CC_ID, + 'default', + CC_DATABASE, + CC_TABLE, + CC_PARTITION, + CASE WHEN CC_TYPE = 'i' THEN 'minor' WHEN CC_TYPE = 'a' THEN 'major' WHEN CC_TYPE = '*' THEN 'smart-optimize' ELSE 'UNKNOWN' END, + CASE WHEN CC_STATE = 'f' THEN 'failed' WHEN CC_STATE = 's' THEN 'succeeded' + WHEN CC_STATE = 'a' THEN 'did not initiate' WHEN CC_STATE = 'c' THEN 'refused' ELSE 'UNKNOWN' END, + CASE WHEN CC_WORKER_ID IS NULL THEN cast (null as string) ELSE split(CC_WORKER_ID,"-")[0] END, + CASE WHEN CC_WORKER_ID IS NULL THEN cast (null as string) ELSE split(CC_WORKER_ID,"-")[size(split(CC_WORKER_ID,"-"))-1] END, + CC_WORKER_VERSION, + FROM_UNIXTIME(CC_ENQUEUE_TIME DIV 1000), + FROM_UNIXTIME(CC_START DIV 1000), + CASE WHEN CC_END IS NULL THEN cast (null as string) ELSE CC_END-CC_START END, + CC_HADOOP_JOB_ID, + CC_RUN_AS, + CC_ERROR_MESSAGE, + CC_NEXT_TXN_ID, + CC_TXN_ID, + FROM_UNIXTIME(CC_COMMIT_TIME DIV 1000), + CC_HIGHEST_WRITE_ID, + CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE split(CC_INITIATOR_ID,"-")[0] END, + CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE split(CC_INITIATOR_ID,"-")[size(split(CC_INITIATOR_ID,"-"))-1] END, + CC_INITIATOR_VERSION, + NULL, + NVL(CC_POOL_NAME, 'default'), + CC_NUMBER_OF_BUCKETS, + CC_TBLPROPERTIES +FROM COMPLETED_COMPACTIONS +UNION ALL +SELECT + CQ_ID, + 'default', + CQ_DATABASE, + CQ_TABLE, + CQ_PARTITION, + CASE WHEN CQ_TYPE = 'i' THEN 'minor' WHEN CQ_TYPE = 'a' THEN 'major' ELSE 'UNKNOWN' END, + CASE WHEN CQ_STATE = 'i' THEN 'initiated' WHEN CQ_STATE = 'w' THEN 'working' WHEN CQ_STATE = 'r' THEN 'ready for cleaning' ELSE 'UNKNOWN' END, + CASE WHEN CQ_WORKER_ID IS NULL THEN NULL ELSE split(CQ_WORKER_ID,"-")[0] END, + CASE WHEN CQ_WORKER_ID IS NULL THEN NULL ELSE split(CQ_WORKER_ID,"-")[size(split(CQ_WORKER_ID,"-"))-1] END, + CQ_WORKER_VERSION, + FROM_UNIXTIME(CQ_ENQUEUE_TIME DIV 1000), + FROM_UNIXTIME(CQ_START DIV 1000), + cast (null as string), + CQ_HADOOP_JOB_ID, + CQ_RUN_AS, + CQ_ERROR_MESSAGE, + CQ_NEXT_TXN_ID, + CQ_TXN_ID, + FROM_UNIXTIME(CQ_COMMIT_TIME DIV 1000), + CQ_HIGHEST_WRITE_ID, + CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE split(CQ_INITIATOR_ID,"-")[0] END, + CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE split(CQ_INITIATOR_ID,"-")[size(split(CQ_INITIATOR_ID,"-"))-1] END, + CQ_INITIATOR_VERSION, + FROM_UNIXTIME(CQ_CLEANER_START DIV 1000), + NVL(CQ_POOL_NAME, 'default'), + CQ_NUMBER_OF_BUCKETS, + CQ_TBLPROPERTIES +FROM COMPACTION_QUEUE; + +USE INFORMATION_SCHEMA; + +CREATE OR REPLACE VIEW `COMPACTIONS` +( + `C_ID`, + `C_CATALOG`, + `C_DATABASE`, + `C_TABLE`, + `C_PARTITION`, + `C_TYPE`, + `C_STATE`, + `C_WORKER_HOST`, + `C_WORKER_ID`, + `C_WORKER_VERSION`, + `C_ENQUEUE_TIME`, + `C_START`, + `C_DURATION`, + `C_HADOOP_JOB_ID`, + `C_RUN_AS`, + `C_ERROR_MESSAGE`, + `C_NEXT_TXN_ID`, + `C_TXN_ID`, + `C_COMMIT_TIME`, + `C_HIGHEST_WRITE_ID`, + `C_INITIATOR_HOST`, + `C_INITIATOR_ID`, + `C_INITIATOR_VERSION`, + `C_CLEANER_START`, + `C_POOL_NAME`, + `C_NUMBER_OF_BUCKETS`, + `C_TBLPROPERTIES` +) AS +SELECT DISTINCT + C_ID, + C_CATALOG, + C_DATABASE, + C_TABLE, + C_PARTITION, + C_TYPE, + C_STATE, + C_WORKER_HOST, + C_WORKER_ID, + C_WORKER_VERSION, + C_ENQUEUE_TIME, + C_START, + C_DURATION, + C_HADOOP_JOB_ID, + C_RUN_AS, + C_ERROR_MESSAGE, + C_NEXT_TXN_ID, + C_TXN_ID, + C_COMMIT_TIME, + C_HIGHEST_WRITE_ID, + C_INITIATOR_HOST, + C_INITIATOR_ID, + C_INITIATOR_VERSION, + C_CLEANER_START, + C_POOL_NAME, + C_NUMBER_OF_BUCKETS, + C_TBLPROPERTIES +FROM + `sys`.`COMPACTIONS` C JOIN `sys`.`TBLS` T ON (C.`C_TABLE` = T.`TBL_NAME`) + JOIN `sys`.`DBS` D ON (C.`C_DATABASE` = D.`NAME`) + LEFT JOIN `sys`.`TBL_PRIVS` P ON (T.`TBL_ID` = P.`TBL_ID`) +WHERE + (NOT restrict_information_schema() OR P.`TBL_ID` IS NOT NULL + AND (P.`PRINCIPAL_NAME`=current_user() AND P.`PRINCIPAL_TYPE`='USER' + OR ((array_contains(current_groups(), P.`PRINCIPAL_NAME`) OR P.`PRINCIPAL_NAME` = 'public') AND P.`PRINCIPAL_TYPE`='GROUP')) + AND P.`TBL_PRIV`='SELECT' AND P.`AUTHORIZER`=current_authorizer()); SELECT 'Finished upgrading MetaStore schema from 4.0.0 to 4.1.0';