ayushtkn commented on code in PR #4855: URL: https://github.com/apache/hive/pull/4855#discussion_r1384205514
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorCompaction.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.compaction; + +import java.io.IOException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext; +import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor; +import org.apache.iceberg.mr.hive.HiveIcebergMetaHook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergMajorCompaction extends QueryCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergMajorCompaction.class.getName()); + + @Override + public boolean run(CompactorContext context) throws IOException, HiveException, InterruptedException { + + String tableName = context.getTable().getTableName(); + LOG.debug("Executing compaction query for table {}", tableName); + + String insertIntoTempIcebergTable = String.format("insert into %s select * from %s", tableName, tableName); + + return runQuery(context.getConf(), insertIntoTempIcebergTable, context.getCompactionInfo().runAs); + } + + private boolean runQuery(HiveConf conf, String query, String user) { + + SessionState sessionState = DriverUtils.setUpSessionState(conf, user, true); + conf.setBoolean(HiveIcebergMetaHook.ICEBERG_COMPACTION, true); Review Comment: If someone sets this conf via beeline, the normal insert into will behave as compaction? Should we set compaction/not compaction via queryState? or do ``` sessionState.setCompaction(true); ``` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -437,6 +439,8 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output dataFiles.addAll(writeResults.dataFiles()); deleteFiles.addAll(writeResults.deleteFiles()); referencedDataFiles.addAll(writeResults.referencedDataFiles()); + + isCompaction = isCompaction || jobContext.getJobConf().getBoolean(HiveIcebergMetaHook.ICEBERG_COMPACTION, false); Review Comment: can we just use ``conf``, there is a line above which refactors the confs into a variable ``` JobConf conf = jobContext.getJobConf(); ``` ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorFactory.java: ########## @@ -108,6 +110,23 @@ public CompactorPipeline getCompactorPipeline(Table table, HiveConf configuratio throw new HiveException( compactionInfo.type.name() + " compaction is not supported on insert only tables."); } + } else if (DDLUtils.isIcebergTable(table)) { + switch (compactionInfo.type) { + case MAJOR: + + try { + Class<? extends QueryCompactor> inputFormatClass = (Class<? extends QueryCompactor>) + Class.forName("org.apache.iceberg.mr.hive.compaction.IcebergMajorCompaction", true, Utilities.getSessionSpecifiedClassLoader()); Review Comment: Hard-coding the class name doesn't look too fancy :-( ########## itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergLlapLocalCompactorCliDriver.java: ########## @@ -0,0 +1,83 @@ +/* Review Comment: Why do we need a new CliDriver? Why can't we use the existing IcebergCli driver? ########## iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction.q: ########## @@ -0,0 +1,33 @@ +--test against vectorized LLAP execution mode +-- SORT_QUERY_RESULTS +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ + +-- set hive.llap.io.enabled=true; +-- set hive.vectorized.execution.enabled=true; +-- set hive.optimize.shared.work.merge.ts.schema=true; + Review Comment: we should test for tables that have undergone partition/schema evolution as well ########## iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction.q: ########## @@ -0,0 +1,33 @@ +--test against vectorized LLAP execution mode +-- SORT_QUERY_RESULTS +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ + +-- set hive.llap.io.enabled=true; +-- set hive.vectorized.execution.enabled=true; +-- set hive.optimize.shared.work.merge.ts.schema=true; Review Comment: why are these commented? ########## iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction.q: ########## @@ -0,0 +1,33 @@ +--test against vectorized LLAP execution mode +-- SORT_QUERY_RESULTS +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ + +-- set hive.llap.io.enabled=true; +-- set hive.vectorized.execution.enabled=true; +-- set hive.optimize.shared.work.merge.ts.schema=true; + +CREATE TABLE ice_orc ( + first_name STRING, + last_name STRING + ) +STORED BY ICEBERG STORED AS ORC +TBLPROPERTIES ('engine.hive.enabled'='true','format-version'='2'); + Review Comment: put a case for partitioned table as well ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -469,12 +473,29 @@ private Long getSnapshotId(Table table, String branchName) { /** * Creates and commits an Iceberg change with the provided data and delete files. * If there are no delete files then an Iceberg 'append' is created, otherwise Iceberg 'overwrite' is created. - * @param table The table we are changing - * @param startTime The start time of the commit - used only for logging - * @param results The object containing the new files we would like to add to the table + * + * @param table The table we are changing + * @param startTime The start time of the commit - used only for logging + * @param results The object containing the new files we would like to add to the table + * @param isCompaction Flag that indicates whether or not the operation is done as part of compaction */ private void commitWrite(Table table, String branchName, Long snapshotId, long startTime, - FilesForCommit results, Operation operation) { + FilesForCommit results, Operation operation, boolean isCompaction) { + + if (isCompaction) { + Transaction transaction = table.newTransaction(); + + DeleteFiles delete = transaction.newDelete(); + delete.deleteFromRowFilter(Expressions.alwaysTrue()); + delete.commit(); + + RowDelta write = transaction.newRowDelta(); + results.dataFiles().forEach(write::addRows); + write.commit(); + + transaction.commitTransaction(); + return; Review Comment: Would be good if we put some log here, about compaction around the num files or so ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorCompaction.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.compaction; + +import java.io.IOException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext; +import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor; +import org.apache.iceberg.mr.hive.HiveIcebergMetaHook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergMajorCompaction extends QueryCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergMajorCompaction.class.getName()); + + @Override + public boolean run(CompactorContext context) throws IOException, HiveException, InterruptedException { + + String tableName = context.getTable().getTableName(); + LOG.debug("Executing compaction query for table {}", tableName); + + String insertIntoTempIcebergTable = String.format("insert into %s select * from %s", tableName, tableName); Review Comment: variable name seems confusing, this is insert into in the same table, right? ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorCompaction.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.compaction; + +import java.io.IOException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext; +import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor; +import org.apache.iceberg.mr.hive.HiveIcebergMetaHook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergMajorCompaction extends QueryCompactor { Review Comment: Can you change the name to be inline with other ``QueryCompactor`` child classes, like ``IcebergMajorQueryCompactor`` ########## iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction.q: ########## @@ -0,0 +1,33 @@ +--test against vectorized LLAP execution mode +-- SORT_QUERY_RESULTS +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ + +-- set hive.llap.io.enabled=true; +-- set hive.vectorized.execution.enabled=true; +-- set hive.optimize.shared.work.merge.ts.schema=true; + +CREATE TABLE ice_orc ( + first_name STRING, + last_name STRING + ) +STORED BY ICEBERG STORED AS ORC +TBLPROPERTIES ('engine.hive.enabled'='true','format-version'='2'); Review Comment: why this is needed ```'engine.hive.enabled'='true'```, it should be by default true & no other test does that ########## iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction.q: ########## @@ -0,0 +1,33 @@ +--test against vectorized LLAP execution mode +-- SORT_QUERY_RESULTS +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ + +-- set hive.llap.io.enabled=true; +-- set hive.vectorized.execution.enabled=true; +-- set hive.optimize.shared.work.merge.ts.schema=true; + +CREATE TABLE ice_orc ( + first_name STRING, + last_name STRING + ) +STORED BY ICEBERG STORED AS ORC +TBLPROPERTIES ('engine.hive.enabled'='true','format-version'='2'); + +Insert into ice_orc VALUES ('fn1','ln1'); +Insert into ice_orc VALUES ('fn2','ln2'); +Insert into ice_orc VALUES ('fn3','ln3'); +Insert into ice_orc VALUES ('fn4','ln4'); +Insert into ice_orc VALUES ('fn5','ln5'); +Insert into ice_orc VALUES ('fn6','ln6'); +Insert into ice_orc VALUES ('fn7','ln7'); + +Update ice_orc set last_name = 'ln1a' where first_name='fn1'; +Update ice_orc set last_name = 'ln2a' where first_name='fn2'; +Update ice_orc set last_name = 'ln3a' where first_name='fn3'; +Update ice_orc set last_name = 'ln4a' where first_name='fn4'; +Update ice_orc set last_name = 'ln5a' where first_name='fn5'; +Update ice_orc set last_name = 'ln6a' where first_name='fn6'; +Update ice_orc set last_name = 'ln7a' where first_name='fn7'; + +alter table ice_orc COMPACT 'major' and wait; Review Comment: do an explain of this query as well ########## iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction.q: ########## @@ -0,0 +1,33 @@ +--test against vectorized LLAP execution mode +-- SORT_QUERY_RESULTS +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ + +-- set hive.llap.io.enabled=true; +-- set hive.vectorized.execution.enabled=true; +-- set hive.optimize.shared.work.merge.ts.schema=true; + +CREATE TABLE ice_orc ( + first_name STRING, + last_name STRING + ) +STORED BY ICEBERG STORED AS ORC +TBLPROPERTIES ('engine.hive.enabled'='true','format-version'='2'); + +Insert into ice_orc VALUES ('fn1','ln1'); +Insert into ice_orc VALUES ('fn2','ln2'); +Insert into ice_orc VALUES ('fn3','ln3'); +Insert into ice_orc VALUES ('fn4','ln4'); +Insert into ice_orc VALUES ('fn5','ln5'); +Insert into ice_orc VALUES ('fn6','ln6'); +Insert into ice_orc VALUES ('fn7','ln7'); + +Update ice_orc set last_name = 'ln1a' where first_name='fn1'; +Update ice_orc set last_name = 'ln2a' where first_name='fn2'; +Update ice_orc set last_name = 'ln3a' where first_name='fn3'; +Update ice_orc set last_name = 'ln4a' where first_name='fn4'; +Update ice_orc set last_name = 'ln5a' where first_name='fn5'; +Update ice_orc set last_name = 'ln6a' where first_name='fn6'; +Update ice_orc set last_name = 'ln7a' where first_name='fn7'; Review Comment: should have delete statements as well ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorFactory.java: ########## @@ -108,6 +110,23 @@ public CompactorPipeline getCompactorPipeline(Table table, HiveConf configuratio throw new HiveException( compactionInfo.type.name() + " compaction is not supported on insert only tables."); } + } else if (DDLUtils.isIcebergTable(table)) { + switch (compactionInfo.type) { + case MAJOR: + + try { + Class<? extends QueryCompactor> inputFormatClass = (Class<? extends QueryCompactor>) Review Comment: this is not ``inputFormatClass``, it is ``QueryCompactor`` class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
