zabetak commented on code in PR #4859:
URL: https://github.com/apache/hive/pull/4859#discussion_r1482918569


##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -648,6 +646,52 @@ public void 
secondCompactionShouldBeRefusedBeforeEnqueueing() throws Exception {
     Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
   }
 
+  @Test
+  public void secondCompactionShouldBeRefusedBeforeEnqueueingForPartition() 
throws Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+
+    final String dbName = "default";
+    final String tableName = "compaction_test";
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(id string, value 
string) partitioned by(pt string) CLUSTERED BY(id) "
+        + "INTO 10 BUCKETS STORED AS ORC 
TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("alter table " + tableName + " add 
partition(pt='test')",driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " 
partition(pt='test') values ('1','one'),('2','two'),('3','three'),"
+        + 
"('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten'),"
+        + 
"('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen'),"
+        + 
"('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty')", 
driver);
+
+    executeStatementOnDriver("insert into " + tableName + " 
partition(pt='test') values ('21', 'value21'),('84', 'value84'),"
+        + "('66', 'value66'),('54', 'value54')", driver);
+    executeStatementOnDriver(
+        "insert into " + tableName + " partition(pt='test') values ('22', 
'value22'),('34', 'value34')," + "('35', 'value35')", driver);
+    executeStatementOnDriver("insert into " + tableName + " 
partition(pt='test') values ('75', 'value75'),('99', 'value99')", driver);
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    //Do a compaction directly and wait for it to finish
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MAJOR);
+    rqst.setPartitionname("pt=test");
+    CompactionResponse resp = txnHandler.compact(rqst);
+    runWorker(conf);
+
+    //Try to do a second compaction on the same table before the cleaner runs.
+    try {
+      driver.run("ALTER TABLE " + tableName + " partition(pt='test') COMPACT 
'major'");

Review Comment:
   This test is mostly a copy of the previous one but not sure we need all of 
this stuff. The main part that we wanted to test is the exception message when 
a partition is set in the request. To do that it would suffice to just add this 
try-catch block in the previous test or rework this test and to keep only the 
necessary things.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java:
##########
@@ -50,59 +49,53 @@
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class InitiatorBase extends MetaStoreCompactorThread {
 
   static final private String COMPACTOR_THRESHOLD_PREFIX = 
"compactorthreshold.";
 
-  private List<CompactionResponse> 
initiateCompactionForMultiplePartitions(Table table,
-      Map<String, Partition> partitions, CompactionRequest request) {
-    List<CompactionResponse> compactionResponses = new ArrayList<>();
-    partitions.entrySet().parallelStream().forEach(entry -> {
-      try {
-        StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(table, 
entry.getValue());
-        String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
-        CompactionInfo ci =
-            new CompactionInfo(table.getDbName(), table.getTableName(), 
entry.getKey(), request.getType());
-        ci.initiatorId = request.getInitiatorId();
-        ci.orderByClause = request.getOrderByClause();
-        ci.initiatorVersion = request.getInitiatorVersion();
-        if (request.getNumberOfBuckets() > 0) {
-          ci.numberOfBuckets = request.getNumberOfBuckets();
-        }
-        ci.poolName = request.getPoolName();
-        LOG.info(
-            "Checking to see if we should compact partition " + entry.getKey() 
+ " of table " + table.getDbName() + "."
-                + table.getTableName());
-        CollectionUtils.addIgnoreNull(compactionResponses,
-            scheduleCompactionIfRequired(ci, table, entry.getValue(), runAs, 
false));
-      } catch (IOException | InterruptedException | MetaException e) {
-        LOG.error(
-            "Error occurred while Checking if we should compact partition " + 
entry.getKey() + " of table " + table.getDbName() + "."
-                + table.getTableName() + " Exception: " + e.getMessage());
-        throw new RuntimeException(e);
-      }
-    });
-    return compactionResponses;
-  }
-
-  public List<CompactionResponse> initiateCompactionForTable(CompactionRequest 
request, Table table, Map<String, Partition> partitions) throws Exception {
+  public void initialize() throws Exception {
+    super.init(new AtomicBoolean());

Review Comment:
   It's quite strange to introduce a new initializer method with roughly the 
same name with the method of its superclass that calls that method but doesn't 
override it.
   
   For callers of `InitiatorBase` it would be hard to figure out when they 
should call `init` and when they should call `initialize`. Moreover it is 
unclear how sub-classes should deal with these two methods.
   
   Is it possible to make this an override or we risk breaking the subclasses?



##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java:
##########
@@ -69,40 +79,48 @@ public AlterTableCompactOperation(DDLOperationContext 
context, AlterTableCompact
       compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets());
     }
 
-    InitiatorBase initiatorBase = new InitiatorBase();
-    initiatorBase.setConf(context.getConf());
-    initiatorBase.init(new AtomicBoolean());
-
-    Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
-        convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
-
-    if(desc.getPartitionSpec() != null){
-      Optional<String> partitionName =  
partitionMap.keySet().stream().findFirst();
-      partitionName.ifPresent(compactionRequest::setPartitionname);
-    }
-    List<CompactionResponse> compactionResponses =
-        initiatorBase.initiateCompactionForTable(compactionRequest, 
table.getTTable(), partitionMap);
-    for (CompactionResponse compactionResponse : compactionResponses) {
-      if (!compactionResponse.isAccepted()) {
-        String message;
-        if (compactionResponse.isSetErrormessage()) {
-          message = compactionResponse.getErrormessage();
-          throw new HiveException(ErrorMsg.COMPACTION_REFUSED, 
table.getDbName(), table.getTableName(),
-              "CompactionId: " + compactionResponse.getId(), message);
-        }
-        context.getConsole().printInfo(
-            "Compaction already enqueued with id " + 
compactionResponse.getId() + "; State is "
-                + compactionResponse.getState());
-        continue;
+    //Will directly initiate compaction if an un-partitioned table/a partition 
is specified in the request
+    if (desc.getPartitionSpec() != null || !table.isPartitioned()) {
+      if (desc.getPartitionSpec() != null) {
+        Optional<String> partitionName = 
partitionMap.keySet().stream().findFirst();
+        partitionName.ifPresent(compactionRequest::setPartitionname);
       }
-      context.getConsole().printInfo("Compaction enqueued with id " + 
compactionResponse.getId());
-      if (desc.isBlocking() && compactionResponse.isAccepted()) {
-        waitForCompactionToFinish(compactionResponse, context);
+      CompactionResponse compactionResponse = 
initiatorBase.initiateCompactionForTable(compactionRequest);

Review Comment:
   OK thanks for the explanation! It makes sense to keep the current behavior. 
If for whatever reason we decide to change this it can be done in another PR.



##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java:
##########
@@ -56,6 +59,13 @@ public AlterTableCompactOperation(DDLOperationContext 
context, AlterTableCompact
       throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, 
table.getDbName(), table.getTableName());
     }
 
+    Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
+        convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
+
+    InitiatorBase initiatorBase = new InitiatorBase();
+    initiatorBase.setConf(context.getConf());
+    initiatorBase.initialize();
+

Review Comment:
   Is there any particular reason why this block was moved earlier up in the 
class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to