This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 20132819610 HIVE-28536: Iceberg: Add support for custom compaction pools (Dmitriy Fingerman, reviewed by Denys Kuzmenko) 20132819610 is described below commit 2013281961060ffe648a87f43c1b7c933860c8dc Author: Dmitriy Fingerman <dmitriy.finger...@gmail.com> AuthorDate: Mon Oct 21 03:53:15 2024 -0400 HIVE-28536: Iceberg: Add support for custom compaction pools (Dmitriy Fingerman, reviewed by Denys Kuzmenko) Closes #5503 --- .../iceberg_major_compaction_partition_evolution.q | 2 +- ...iceberg_major_compaction_partition_evolution2.q | 3 + .../iceberg_major_compaction_partitioned.q | 8 +- .../iceberg_major_compaction_unpartitioned.q | 6 +- ...ceberg_major_compaction_unpartitioned_ordered.q | 2 +- ...berg_major_compaction_partition_evolution.q.out | 12 +-- ...erg_major_compaction_partition_evolution2.q.out | 98 ++++++++++++---------- .../iceberg_major_compaction_partitioned.q.out | 30 +++---- .../iceberg_major_compaction_unpartitioned.q.out | 16 ++-- ...rg_major_compaction_unpartitioned_ordered.q.out | 8 +- .../TestIcebergLlapLocalCompactorCliDriver.java | 8 ++ .../hadoop/hive/ql/parse/AlterClauseParser.g | 4 +- .../hive/ql/parse/TestParseOptimizeTable.java | 28 ++++++- .../compact/AlterTableCompactOperation.java | 9 +- .../hive/ql/txn/compactor/CompactorUtil.java | 12 +++ .../hadoop/hive/ql/txn/compactor/Initiator.java | 14 +--- 16 files changed, 159 insertions(+), 101 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q index 9451c8f2c3a..e6c40dd20cc 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q @@ -31,7 +31,7 @@ create table ice_orc ( ) partitioned by (company_id bigint) stored by iceberg stored as orc -tblproperties ('format-version'='2'); +tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg'); insert into ice_orc VALUES ('fn1','ln1', 1, 10, 100); insert into ice_orc VALUES ('fn2','ln2', 1, 10, 100); diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q index 723eeeefb7d..4d6d390d3cd 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution2.q @@ -23,6 +23,9 @@ set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; set hive.optimize.shared.work.merge.ts.schema=true; +create database ice_comp with dbproperties('hive.compactor.worker.pool'='iceberg'); +use ice_comp; + create table ice_orc ( first_name string, last_name string, diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q index 6a89f81e233..7637c80555d 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q @@ -52,8 +52,8 @@ delete from ice_orc where last_name in ('ln1a', 'ln2a', 'ln7a'); select * from ice_orc; describe formatted ice_orc; -explain alter table ice_orc COMPACT 'major' and wait; -alter table ice_orc COMPACT 'major' and wait; +explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg'; +alter table ice_orc COMPACT 'major' and wait pool 'iceberg'; select * from ice_orc; describe formatted ice_orc; @@ -85,8 +85,8 @@ delete from ice_orc where last_name in ('ln11a', 'ln12a', 'ln17a', 'ln18a'); select * from ice_orc; describe formatted ice_orc; -explain alter table ice_orc COMPACT 'major' and wait; -alter table ice_orc COMPACT 'major' and wait; +explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg'; +alter table ice_orc COMPACT 'major' and wait pool 'iceberg'; select * from ice_orc; describe formatted ice_orc; diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_unpartitioned.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_unpartitioned.q index 8268be8cd3f..d7f52364a91 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_unpartitioned.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_unpartitioned.q @@ -52,10 +52,10 @@ delete from ice_orc where last_name in ('ln5a', 'ln6a', 'ln7a'); select * from ice_orc; describe formatted ice_orc; -explain alter table ice_orc COMPACT 'major' and wait; -explain optimize table ice_orc rewrite data; +explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg'; +explain optimize table ice_orc rewrite data pool 'iceberg'; -alter table ice_orc COMPACT 'major' and wait; +alter table ice_orc COMPACT 'major' and wait pool 'iceberg'; select * from ice_orc; describe formatted ice_orc; diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_unpartitioned_ordered.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_unpartitioned_ordered.q index 26ccab22847..d6f422fa6a2 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_unpartitioned_ordered.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_unpartitioned_ordered.q @@ -28,7 +28,7 @@ create table ice_orc ( last_name string ) stored by iceberg stored as orc -tblproperties ('format-version'='2'); +tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg'); insert into ice_orc VALUES ('fn1','ln1'), diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out index bea4d098b09..970467e9d99 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out @@ -6,7 +6,7 @@ PREHOOK: query: create table ice_orc ( ) partitioned by (company_id bigint) stored by iceberg stored as orc -tblproperties ('format-version'='2') +tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg') PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@ice_orc @@ -18,7 +18,7 @@ POSTHOOK: query: create table ice_orc ( ) partitioned by (company_id bigint) stored by iceberg stored as orc -tblproperties ('format-version'='2') +tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg') POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@ice_orc @@ -243,6 +243,7 @@ Table Parameters: current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 + hive.compactor.worker.pool iceberg iceberg.orc.files.only true #### A masked pattern was here #### numFiles 14 @@ -344,6 +345,7 @@ Table Parameters: current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 + hive.compactor.worker.pool iceberg iceberg.orc.files.only true #### A masked pattern was here #### numFiles 2 @@ -374,6 +376,6 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions order by 'partition' POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out index ab7ab901642..c6ac22d4727 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out @@ -1,3 +1,15 @@ +PREHOOK: query: create database ice_comp with dbproperties('hive.compactor.worker.pool'='iceberg') +PREHOOK: type: CREATEDATABASE +PREHOOK: Output: database:ice_comp +POSTHOOK: query: create database ice_comp with dbproperties('hive.compactor.worker.pool'='iceberg') +POSTHOOK: type: CREATEDATABASE +POSTHOOK: Output: database:ice_comp +PREHOOK: query: use ice_comp +PREHOOK: type: SWITCHDATABASE +PREHOOK: Input: database:ice_comp +POSTHOOK: query: use ice_comp +POSTHOOK: type: SWITCHDATABASE +POSTHOOK: Input: database:ice_comp PREHOOK: query: create table ice_orc ( first_name string, last_name string, @@ -6,8 +18,8 @@ PREHOOK: query: create table ice_orc ( stored by iceberg stored as orc tblproperties ('format-version'='2') PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@ice_orc +PREHOOK: Output: database:ice_comp +PREHOOK: Output: ice_comp@ice_orc POSTHOOK: query: create table ice_orc ( first_name string, last_name string, @@ -16,102 +28,102 @@ POSTHOOK: query: create table ice_orc ( stored by iceberg stored as orc tblproperties ('format-version'='2') POSTHOOK: type: CREATETABLE -POSTHOOK: Output: database:default -POSTHOOK: Output: default@ice_orc +POSTHOOK: Output: database:ice_comp +POSTHOOK: Output: ice_comp@ice_orc PREHOOK: query: insert into ice_orc VALUES ('fn1','ln1', 1) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table -PREHOOK: Output: default@ice_orc +PREHOOK: Output: ice_comp@ice_orc POSTHOOK: query: insert into ice_orc VALUES ('fn1','ln1', 1) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table -POSTHOOK: Output: default@ice_orc +POSTHOOK: Output: ice_comp@ice_orc PREHOOK: query: insert into ice_orc VALUES ('fn2','ln2', 1) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table -PREHOOK: Output: default@ice_orc +PREHOOK: Output: ice_comp@ice_orc POSTHOOK: query: insert into ice_orc VALUES ('fn2','ln2', 1) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table -POSTHOOK: Output: default@ice_orc +POSTHOOK: Output: ice_comp@ice_orc PREHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table -PREHOOK: Output: default@ice_orc +PREHOOK: Output: ice_comp@ice_orc POSTHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table -POSTHOOK: Output: default@ice_orc +POSTHOOK: Output: ice_comp@ice_orc PREHOOK: query: insert into ice_orc VALUES ('fn4','ln4', 1) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table -PREHOOK: Output: default@ice_orc +PREHOOK: Output: ice_comp@ice_orc POSTHOOK: query: insert into ice_orc VALUES ('fn4','ln4', 1) POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table -POSTHOOK: Output: default@ice_orc +POSTHOOK: Output: ice_comp@ice_orc PREHOOK: query: delete from ice_orc where last_name in ('ln3', 'ln4') PREHOOK: type: QUERY -PREHOOK: Input: default@ice_orc +PREHOOK: Input: ice_comp@ice_orc #### A masked pattern was here #### POSTHOOK: query: delete from ice_orc where last_name in ('ln3', 'ln4') POSTHOOK: type: QUERY -POSTHOOK: Input: default@ice_orc +POSTHOOK: Input: ice_comp@ice_orc #### A masked pattern was here #### PREHOOK: query: alter table ice_orc set partition spec(dept_id) PREHOOK: type: ALTERTABLE_SETPARTSPEC -PREHOOK: Input: default@ice_orc +PREHOOK: Input: ice_comp@ice_orc POSTHOOK: query: alter table ice_orc set partition spec(dept_id) POSTHOOK: type: ALTERTABLE_SETPARTSPEC -POSTHOOK: Input: default@ice_orc -POSTHOOK: Output: default@ice_orc +POSTHOOK: Input: ice_comp@ice_orc +POSTHOOK: Output: ice_comp@ice_orc PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5') PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table -PREHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: Output: ice_comp@ice_orc@dept_id=2 POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5') POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table -POSTHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: Output: ice_comp@ice_orc@dept_id=2 PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6') PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table -PREHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: Output: ice_comp@ice_orc@dept_id=2 POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6') POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table -POSTHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: Output: ice_comp@ice_orc@dept_id=2 PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7') PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table -PREHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: Output: ice_comp@ice_orc@dept_id=2 POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7') POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table -POSTHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: Output: ice_comp@ice_orc@dept_id=2 PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8') PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table -PREHOOK: Output: default@ice_orc@dept_id=2 +PREHOOK: Output: ice_comp@ice_orc@dept_id=2 POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8') POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table -POSTHOOK: Output: default@ice_orc@dept_id=2 +POSTHOOK: Output: ice_comp@ice_orc@dept_id=2 PREHOOK: query: delete from ice_orc where last_name in ('ln7', 'ln8') PREHOOK: type: QUERY -PREHOOK: Input: default@ice_orc +PREHOOK: Input: ice_comp@ice_orc #### A masked pattern was here #### POSTHOOK: query: delete from ice_orc where last_name in ('ln7', 'ln8') POSTHOOK: type: QUERY -POSTHOOK: Input: default@ice_orc +POSTHOOK: Input: ice_comp@ice_orc #### A masked pattern was here #### PREHOOK: query: select * from ice_orc PREHOOK: type: QUERY -PREHOOK: Input: default@ice_orc +PREHOOK: Input: ice_comp@ice_orc #### A masked pattern was here #### POSTHOOK: query: select * from ice_orc POSTHOOK: type: QUERY -POSTHOOK: Input: default@ice_orc +POSTHOOK: Input: ice_comp@ice_orc #### A masked pattern was here #### fn1 ln1 1 fn2 ln2 1 @@ -119,10 +131,10 @@ fn5 ln5 2 fn6 ln6 2 PREHOOK: query: describe formatted ice_orc PREHOOK: type: DESCTABLE -PREHOOK: Input: default@ice_orc +PREHOOK: Input: ice_comp@ice_orc POSTHOOK: query: describe formatted ice_orc POSTHOOK: type: DESCTABLE -POSTHOOK: Input: default@ice_orc +POSTHOOK: Input: ice_comp@ice_orc # col_name data_type comment first_name string last_name string @@ -133,7 +145,7 @@ dept_id bigint dept_id IDENTITY # Detailed Table Information -Database: default +Database: ice_comp #### A masked pattern was here #### Retention: 0 #### A masked pattern was here #### @@ -180,19 +192,19 @@ POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId PREHOOK: query: alter table ice_orc COMPACT 'major' and wait PREHOOK: type: ALTERTABLE_COMPACT -PREHOOK: Input: default@ice_orc -PREHOOK: Output: default@ice_orc +PREHOOK: Input: ice_comp@ice_orc +PREHOOK: Output: ice_comp@ice_orc POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait POSTHOOK: type: ALTERTABLE_COMPACT -POSTHOOK: Input: default@ice_orc -POSTHOOK: Output: default@ice_orc +POSTHOOK: Input: ice_comp@ice_orc +POSTHOOK: Output: ice_comp@ice_orc PREHOOK: query: select * from ice_orc PREHOOK: type: QUERY -PREHOOK: Input: default@ice_orc +PREHOOK: Input: ice_comp@ice_orc #### A masked pattern was here #### POSTHOOK: query: select * from ice_orc POSTHOOK: type: QUERY -POSTHOOK: Input: default@ice_orc +POSTHOOK: Input: ice_comp@ice_orc #### A masked pattern was here #### fn1 ln1 1 fn2 ln2 1 @@ -200,10 +212,10 @@ fn5 ln5 2 fn6 ln6 2 PREHOOK: query: describe formatted ice_orc PREHOOK: type: DESCTABLE -PREHOOK: Input: default@ice_orc +PREHOOK: Input: ice_comp@ice_orc POSTHOOK: query: describe formatted ice_orc POSTHOOK: type: DESCTABLE -POSTHOOK: Input: default@ice_orc +POSTHOOK: Input: ice_comp@ice_orc # col_name data_type comment first_name string last_name string @@ -214,7 +226,7 @@ dept_id bigint dept_id IDENTITY # Detailed Table Information -Database: default +Database: ice_comp #### A masked pattern was here #### Retention: 0 #### A masked pattern was here #### @@ -259,5 +271,5 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions order by 'partition' POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# ice_comp ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out index b980e2deaa1..9f98bd79aad 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out @@ -220,11 +220,11 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat Compressed: No Sort Columns: [] -PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait +PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg' PREHOOK: type: ALTERTABLE_COMPACT PREHOOK: Input: default@ice_orc PREHOOK: Output: default@ice_orc -POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait +POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg' POSTHOOK: type: ALTERTABLE_COMPACT POSTHOOK: Input: default@ice_orc POSTHOOK: Output: default@ice_orc @@ -237,14 +237,15 @@ STAGE PLANS: compaction type: major table name: default.ice_orc numberOfBuckets: 0 + pool: iceberg table name: default.ice_orc blocking: true -PREHOOK: query: alter table ice_orc COMPACT 'major' and wait +PREHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg' PREHOOK: type: ALTERTABLE_COMPACT PREHOOK: Input: default@ice_orc PREHOOK: Output: default@ice_orc -POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait +POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg' POSTHOOK: type: ALTERTABLE_COMPACT POSTHOOK: Input: default@ice_orc POSTHOOK: Output: default@ice_orc @@ -321,8 +322,8 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions order by 'partition' POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- PREHOOK: query: insert into ice_orc VALUES ('fn11','ln11', 1) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table @@ -547,11 +548,11 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat Compressed: No Sort Columns: [] -PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait +PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg' PREHOOK: type: ALTERTABLE_COMPACT PREHOOK: Input: default@ice_orc PREHOOK: Output: default@ice_orc -POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait +POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg' POSTHOOK: type: ALTERTABLE_COMPACT POSTHOOK: Input: default@ice_orc POSTHOOK: Output: default@ice_orc @@ -564,14 +565,15 @@ STAGE PLANS: compaction type: major table name: default.ice_orc numberOfBuckets: 0 + pool: iceberg table name: default.ice_orc blocking: true -PREHOOK: query: alter table ice_orc COMPACT 'major' and wait +PREHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg' PREHOOK: type: ALTERTABLE_COMPACT PREHOOK: Input: default@ice_orc PREHOOK: Output: default@ice_orc -POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait +POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg' POSTHOOK: type: ALTERTABLE_COMPACT POSTHOOK: Input: default@ice_orc POSTHOOK: Output: default@ice_orc @@ -652,7 +654,7 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions order by 'partition' POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out index 5e25dfd1ca3..6ebc90fe71a 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out @@ -212,11 +212,11 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat Compressed: No Sort Columns: [] -PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait +PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg' PREHOOK: type: ALTERTABLE_COMPACT PREHOOK: Input: default@ice_orc PREHOOK: Output: default@ice_orc -POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait +POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg' POSTHOOK: type: ALTERTABLE_COMPACT POSTHOOK: Input: default@ice_orc POSTHOOK: Output: default@ice_orc @@ -229,14 +229,15 @@ STAGE PLANS: compaction type: major table name: default.ice_orc numberOfBuckets: 0 + pool: iceberg table name: default.ice_orc blocking: true -PREHOOK: query: explain optimize table ice_orc rewrite data +PREHOOK: query: explain optimize table ice_orc rewrite data pool 'iceberg' PREHOOK: type: ALTERTABLE_COMPACT PREHOOK: Input: default@ice_orc PREHOOK: Output: default@ice_orc -POSTHOOK: query: explain optimize table ice_orc rewrite data +POSTHOOK: query: explain optimize table ice_orc rewrite data pool 'iceberg' POSTHOOK: type: ALTERTABLE_COMPACT POSTHOOK: Input: default@ice_orc POSTHOOK: Output: default@ice_orc @@ -249,14 +250,15 @@ STAGE PLANS: compaction type: major table name: default.ice_orc numberOfBuckets: 0 + pool: iceberg table name: default.ice_orc blocking: true -PREHOOK: query: alter table ice_orc COMPACT 'major' and wait +PREHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg' PREHOOK: type: ALTERTABLE_COMPACT PREHOOK: Input: default@ice_orc PREHOOK: Output: default@ice_orc -POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait +POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg' POSTHOOK: type: ALTERTABLE_COMPACT POSTHOOK: Input: default@ice_orc POSTHOOK: Output: default@ice_orc @@ -327,4 +329,4 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out index 5c3edbc8dd1..73eecc3d621 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out @@ -3,7 +3,7 @@ PREHOOK: query: create table ice_orc ( last_name string ) stored by iceberg stored as orc -tblproperties ('format-version'='2') +tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg') PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@ice_orc @@ -12,7 +12,7 @@ POSTHOOK: query: create table ice_orc ( last_name string ) stored by iceberg stored as orc -tblproperties ('format-version'='2') +tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg') POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@ice_orc @@ -83,6 +83,7 @@ Table Parameters: current-snapshot-summary {\"added-position-delete-files\":\"1\",\"added-delete-files\":\"1\",\"added-files-size\":\"#Masked#\",\"added-position-deletes\":\"3\",\"changed-partition-count\":\"1\",\"total-records\":\"7\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"1\",\"total-delete-files\":\"1\",\"total-position-deletes\":\"3\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} current-snapshot-timestamp-ms #Masked# format-version 2 + hive.compactor.worker.pool iceberg iceberg.orc.files.only true #### A masked pattern was here #### numFiles 1 @@ -195,6 +196,7 @@ Table Parameters: current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"1\",\"removed-position-delete-files\":\"1\",\"removed-delete-files\":\"1\",\"added-records\":\"4\",\"deleted-records\":\"7\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"1\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"1\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\" [...] current-snapshot-timestamp-ms #Masked# format-version 2 + hive.compactor.worker.pool iceberg iceberg.orc.files.only true #### A masked pattern was here #### numFiles 1 @@ -225,4 +227,4 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 --- diff --git a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergLlapLocalCompactorCliDriver.java b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergLlapLocalCompactorCliDriver.java index 795cc3cc09f..3b41b2eb430 100644 --- a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergLlapLocalCompactorCliDriver.java +++ b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergLlapLocalCompactorCliDriver.java @@ -41,6 +41,8 @@ public class TestIcebergLlapLocalCompactorCliDriver { static CliAdapter adapter = new CliConfigs.IcebergLlapLocalCompactorCliConfig().getCliAdapter(); private static final AtomicBoolean stop = new AtomicBoolean(); + private static final String DEFAULT_POOL_NAME = null; + private static final String ICEBERG_POOL_NAME = "iceberg"; private static Worker worker; @Parameters(name ="{0}") @@ -50,8 +52,14 @@ public class TestIcebergLlapLocalCompactorCliDriver { @BeforeClass public static void setup() throws Exception { + setupWorker(DEFAULT_POOL_NAME); + setupWorker(ICEBERG_POOL_NAME); + } + + private static void setupWorker(String poolName) throws Exception { worker = new Worker(); worker.setConf(SessionState.get().getConf()); + worker.setPoolName(poolName); stop.set(false); worker.init(stop); worker.start(); diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g index 5a5b51673cd..b896e3d35b1 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g @@ -114,8 +114,8 @@ optimizeTableStatementSuffix optimizeTblRewriteDataSuffix @init { gParent.msgs.push("compaction request"); } @after { gParent.msgs.pop(); } - : KW_REWRITE KW_DATA orderByClause? whereClause? - -> ^(TOK_ALTERTABLE_COMPACT Identifier["'MAJOR'"] TOK_BLOCKING orderByClause? whereClause?) + : KW_REWRITE KW_DATA orderByClause? whereClause? compactPool? + -> ^(TOK_ALTERTABLE_COMPACT Identifier["'MAJOR'"] TOK_BLOCKING orderByClause? whereClause? compactPool?) ; alterStatementPartitionKeyType diff --git a/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseOptimizeTable.java b/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseOptimizeTable.java index 744d603104c..db5c2a06070 100644 --- a/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseOptimizeTable.java +++ b/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseOptimizeTable.java @@ -28,7 +28,7 @@ public class TestParseOptimizeTable { @Test public void testOptimizeTableWithWhere() throws Exception { - String EXPECTED_WHERE_FILTER = "\n" + + String expectedWhereFilter = "\n" + "nil\n" + " TOK_ALTERTABLE\n" + " TOK_TABNAME\n" + @@ -52,13 +52,13 @@ public class TestParseOptimizeTable { ASTNode tree = parseDriver.parse( " optimize table tbl0 rewrite data where (col01 in ('A', 'B') and col02 < '2024-09-17 00:00:00')", null).getTree(); - assertThat(tree.dump(), is(EXPECTED_WHERE_FILTER)); + assertThat(tree.dump(), is(expectedWhereFilter)); } @Test public void testOptimizeTableWithOrderBy() throws Exception { - String EXPECTED_ORDER_BY = "\n" + + String expectedOrderBy = "\n" + "nil\n" + " TOK_ALTERTABLE\n" + " TOK_TABNAME\n" + @@ -75,6 +75,26 @@ public class TestParseOptimizeTable { ASTNode tree = parseDriver.parse( " optimize table tbl0 rewrite data order by col01 desc", null).getTree(); - assertThat(tree.dump(), is(EXPECTED_ORDER_BY)); + assertThat(tree.dump(), is(expectedOrderBy)); + } + + @Test + public void testOptimizeTableWithPool() throws Exception { + + String expectedWithCompactPool = "\n" + + "nil\n" + + " TOK_ALTERTABLE\n" + + " TOK_TABNAME\n" + + " tbl0\n" + + " TOK_ALTERTABLE_COMPACT\n" + + " 'MAJOR'\n" + + " TOK_BLOCKING\n" + + " TOK_COMPACT_POOL\n" + + " 'iceberg'\n" + + " <EOF>\n"; + + ASTNode tree = parseDriver.parse( + " optimize table tbl0 rewrite data pool 'iceberg'", null).getTree(); + assertThat(tree.dump(), is(expectedWithCompactPool)); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java index 11afd462638..842ac8cc878 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage.compact; +import org.apache.commons.lang3.ObjectUtils; import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; +import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache; import java.util.List; import java.util.ArrayList; @@ -54,6 +56,8 @@ import static org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftTyp */ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDesc> { + private static MetadataCache metadataCache = new MetadataCache(true); + public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompactDesc desc) { super(context, desc); } @@ -88,7 +92,10 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe CompactionRequest compactionRequest = new CompactionRequest(table.getDbName(), table.getTableName(), compactionTypeStr2ThriftType(desc.getCompactionType())); - compactionRequest.setPoolName(desc.getPoolName()); + String poolName = ObjectUtils.defaultIfNull(desc.getPoolName(), + CompactorUtil.getPoolName(context.getConf(), table.getTTable(), metadataCache)); + + compactionRequest.setPoolName(poolName); compactionRequest.setProperties(desc.getProperties()); compactionRequest.setInitiatorId(JavaUtils.hostname() + "-" + HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION); compactionRequest.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java index b97d67015c2..39d89b0dc11 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.collect.Maps; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.conf.Configuration; @@ -565,4 +566,15 @@ public class CompactorUtil { } return poolConf; } + + public static String getPoolName(HiveConf conf, Table t, MetadataCache metadataCache) throws Exception { + Map<String, String> params = ObjectUtils.defaultIfNull(t.getParameters(), Collections.emptyMap()); + String poolName = params.get(Constants.HIVE_COMPACTOR_WORKER_POOL); + if (StringUtils.isBlank(poolName)) { + params = ObjectUtils.defaultIfNull(metadataCache.computeIfAbsent(t.getDbName(), + () -> resolveDatabase(conf, t.getDbName())).getParameters(), Collections.emptyMap()); + poolName = params.get(Constants.HIVE_COMPACTOR_WORKER_POOL); + } + return poolName; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 75c435a3123..8a1bcb98733 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -19,10 +19,8 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -141,7 +139,7 @@ public class Initiator extends MetaStoreCompactorThread { } Table t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci)); - ci.poolName = getPoolName(ci, t); + ci.poolName = CompactorUtil.getPoolName(conf, t, metadataCache); Partition p = resolvePartition(ci); if (p == null && ci.partName != null) { LOG.info("Can't find partition " + ci.getFullPartitionName() + @@ -213,16 +211,6 @@ public class Initiator extends MetaStoreCompactorThread { MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON); } - private String getPoolName(CompactionInfo ci, Table t) throws Exception { - Map<String, String> params = t.getParameters(); - String poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL); - if (StringUtils.isBlank(poolName)) { - params = metadataCache.computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters(); - poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL); - } - return poolName; - } - private Database resolveDatabase(CompactionInfo ci) throws MetaException, NoSuchObjectException { return CompactorUtil.resolveDatabase(conf, ci.dbname); }