zabetak commented on code in PR #4859: URL: https://github.com/apache/hive/pull/4859#discussion_r1482918569
########## itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java: ########## @@ -648,6 +646,52 @@ public void secondCompactionShouldBeRefusedBeforeEnqueueing() throws Exception { Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); } + @Test + public void secondCompactionShouldBeRefusedBeforeEnqueueingForPartition() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + + final String dbName = "default"; + final String tableName = "compaction_test"; + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(id string, value string) partitioned by(pt string) CLUSTERED BY(id) " + + "INTO 10 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true')", driver); + executeStatementOnDriver("alter table " + tableName + " add partition(pt='test')",driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " partition(pt='test') values ('1','one'),('2','two'),('3','three')," + + "('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten')," + + "('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen')," + + "('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty')", driver); + + executeStatementOnDriver("insert into " + tableName + " partition(pt='test') values ('21', 'value21'),('84', 'value84')," + + "('66', 'value66'),('54', 'value54')", driver); + executeStatementOnDriver( + "insert into " + tableName + " partition(pt='test') values ('22', 'value22'),('34', 'value34')," + "('35', 'value35')", driver); + executeStatementOnDriver("insert into " + tableName + " partition(pt='test') values ('75', 'value75'),('99', 'value99')", driver); + + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + + //Do a compaction directly and wait for it to finish + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + rqst.setPartitionname("pt=test"); + CompactionResponse resp = txnHandler.compact(rqst); + runWorker(conf); + + //Try to do a second compaction on the same table before the cleaner runs. + try { + driver.run("ALTER TABLE " + tableName + " partition(pt='test') COMPACT 'major'"); Review Comment: This test is mostly a copy of the previous one but not sure we need all of this stuff. The main part that we wanted to test is the exception message when a partition is set in the request. To do that it would suffice to just add this try-catch block in the previous test or rework this test and to keep only the necessary things. ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java: ########## @@ -50,59 +49,53 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class InitiatorBase extends MetaStoreCompactorThread { static final private String COMPACTOR_THRESHOLD_PREFIX = "compactorthreshold."; - private List<CompactionResponse> initiateCompactionForMultiplePartitions(Table table, - Map<String, Partition> partitions, CompactionRequest request) { - List<CompactionResponse> compactionResponses = new ArrayList<>(); - partitions.entrySet().parallelStream().forEach(entry -> { - try { - StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(table, entry.getValue()); - String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf); - CompactionInfo ci = - new CompactionInfo(table.getDbName(), table.getTableName(), entry.getKey(), request.getType()); - ci.initiatorId = request.getInitiatorId(); - ci.orderByClause = request.getOrderByClause(); - ci.initiatorVersion = request.getInitiatorVersion(); - if (request.getNumberOfBuckets() > 0) { - ci.numberOfBuckets = request.getNumberOfBuckets(); - } - ci.poolName = request.getPoolName(); - LOG.info( - "Checking to see if we should compact partition " + entry.getKey() + " of table " + table.getDbName() + "." - + table.getTableName()); - CollectionUtils.addIgnoreNull(compactionResponses, - scheduleCompactionIfRequired(ci, table, entry.getValue(), runAs, false)); - } catch (IOException | InterruptedException | MetaException e) { - LOG.error( - "Error occurred while Checking if we should compact partition " + entry.getKey() + " of table " + table.getDbName() + "." - + table.getTableName() + " Exception: " + e.getMessage()); - throw new RuntimeException(e); - } - }); - return compactionResponses; - } - - public List<CompactionResponse> initiateCompactionForTable(CompactionRequest request, Table table, Map<String, Partition> partitions) throws Exception { + public void initialize() throws Exception { + super.init(new AtomicBoolean()); Review Comment: It's quite strange to introduce a new initializer method with roughly the same name with the method of its superclass that calls that method but doesn't override it. For callers of `InitiatorBase` it would be hard to figure out when they should call `init` and when they should call `initialize`. Moreover it is unclear how sub-classes should deal with these two methods. Is it possible to make this an override or we risk breaking the subclasses? ########## ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java: ########## @@ -69,40 +79,48 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets()); } - InitiatorBase initiatorBase = new InitiatorBase(); - initiatorBase.setConf(context.getConf()); - initiatorBase.init(new AtomicBoolean()); - - Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap = - convertPartitionsFromThriftToDB(getPartitions(table, desc, context)); - - if(desc.getPartitionSpec() != null){ - Optional<String> partitionName = partitionMap.keySet().stream().findFirst(); - partitionName.ifPresent(compactionRequest::setPartitionname); - } - List<CompactionResponse> compactionResponses = - initiatorBase.initiateCompactionForTable(compactionRequest, table.getTTable(), partitionMap); - for (CompactionResponse compactionResponse : compactionResponses) { - if (!compactionResponse.isAccepted()) { - String message; - if (compactionResponse.isSetErrormessage()) { - message = compactionResponse.getErrormessage(); - throw new HiveException(ErrorMsg.COMPACTION_REFUSED, table.getDbName(), table.getTableName(), - "CompactionId: " + compactionResponse.getId(), message); - } - context.getConsole().printInfo( - "Compaction already enqueued with id " + compactionResponse.getId() + "; State is " - + compactionResponse.getState()); - continue; + //Will directly initiate compaction if an un-partitioned table/a partition is specified in the request + if (desc.getPartitionSpec() != null || !table.isPartitioned()) { + if (desc.getPartitionSpec() != null) { + Optional<String> partitionName = partitionMap.keySet().stream().findFirst(); + partitionName.ifPresent(compactionRequest::setPartitionname); } - context.getConsole().printInfo("Compaction enqueued with id " + compactionResponse.getId()); - if (desc.isBlocking() && compactionResponse.isAccepted()) { - waitForCompactionToFinish(compactionResponse, context); + CompactionResponse compactionResponse = initiatorBase.initiateCompactionForTable(compactionRequest); Review Comment: OK thanks for the explanation! It makes sense to keep the current behavior. If for whatever reason we decide to change this it can be done in another PR. ########## ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java: ########## @@ -56,6 +59,13 @@ public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompact throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, table.getDbName(), table.getTableName()); } + Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap = + convertPartitionsFromThriftToDB(getPartitions(table, desc, context)); + + InitiatorBase initiatorBase = new InitiatorBase(); + initiatorBase.setConf(context.getConf()); + initiatorBase.initialize(); + Review Comment: Is there any particular reason why this block was moved earlier up in the class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org