This is an automated email from the ASF dual-hosted git repository.

asinkovits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 93c186e675 HIVE-25492: Major query-based compaction is skipped if 
partition is empty  (Antal Sinkovits, reviewed by Denys Kuzmenko)
93c186e675 is described below

commit 93c186e675bdeecd89322f58f15f9c62d4935ed3
Author: Antal Sinkovits <[email protected]>
AuthorDate: Mon Apr 11 12:12:54 2022 +0200

    HIVE-25492: Major query-based compaction is skipped if partition is empty  
(Antal Sinkovits, reviewed by Denys Kuzmenko)
    
    Closes #3157
---
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   |  97 ++++++++-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |  68 ++++--
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |   2 +-
 .../hive/ql/txn/compactor/MajorQueryCompactor.java |   2 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    | 230 ++++++++++++++++++++-
 5 files changed, 372 insertions(+), 27 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 8e75a793a0..f409ec2efe 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -28,9 +28,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -48,7 +51,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.TxnCommandsBaseForTests;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook;
@@ -73,7 +75,6 @@ import 
org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.FieldSetter;
 
 import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runWorker;
@@ -2209,4 +2210,96 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     qc.runCompactionQueries(hiveConf, null, sdMock, null, ciMock, null, 
emptyQueries, emptyQueries, emptyQueries);
     Assert.assertEquals("none", 
hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT));
   }
+
+  @Test
+  public void testIfEmptyBaseIsPresentAfterCompaction() throws Exception {
+    String dbName = "default";
+    String tblName = "empty_table";
+
+    // Setup of LOAD INPATH scenario.
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("create table " + tblName + " (a string) stored 
as orc " +
+            "TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("insert into " + tblName + " values ('a')", 
driver);
+    executeStatementOnDriver("delete from " + tblName + " where a='a'", 
driver);
+
+    // Run a query-based MAJOR compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tblName, 
CompactionType.MAJOR, true);
+    // Clean up resources
+    CompactorTestUtil.runCleaner(conf);
+
+    IMetaStoreClient hmsClient = new HiveMetaStoreClient(conf);
+    Table table = hmsClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+
+    FileStatus[] fileStatuses = fs.listStatus(new 
Path(table.getSd().getLocation()));
+    // There should be only dir
+    Assert.assertEquals(1, fileStatuses.length);
+    Path basePath = fileStatuses[0].getPath();
+    // And it's a base
+    Assert.assertTrue(AcidUtils.baseFileFilter.accept(basePath));
+    RemoteIterator<LocatedFileStatus> filesInBase = fs.listFiles(basePath, 
true);
+    // It has no files in it
+    Assert.assertFalse(filesInBase.hasNext());
+  }
+
+  @Test
+  public void testNonAcidToAcidConversionWithNestedTableWithUnionSubdir() 
throws Exception {
+    String dbName = "default";
+
+    // Helper table for the union all insert
+    String helperTblName = "helper_table";
+    executeStatementOnDriver("drop table if exists " + helperTblName, driver);
+    executeStatementOnDriver("create table " + helperTblName + " (a int, b 
int) stored as orc " +
+            "TBLPROPERTIES ('transactional'='false')", driver);
+    executeStatementOnDriver("insert into " + helperTblName + " values (1, 1), 
(2, 2)", driver);
+
+    // Non acid nested table with union subdirs
+    String tblName = "non_acid_nested";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("create table " + tblName +
+            "(a int, b int) partitioned by (p string, q string) stored as orc 
TBLPROPERTIES ('transactional'='false')", driver);
+
+    // Insert some union data
+    executeStatementOnDriver("insert into " + tblName + " 
partition(p='p1',q='q1') " +
+            "select a,b from " + helperTblName + " union all select a,b from " 
+ helperTblName, driver);
+
+    // Some sanity checks
+    List<String> result = execSelectAndDumpData("select * from " + tblName, 
driver, tblName);
+    Assert.assertEquals(4, result.size());
+
+    // Convert the table to acid
+    executeStatementOnDriver("alter table " + tblName + " SET TBLPROPERTIES 
('transactional'='true')", driver);
+
+    // Run a query-based MAJOR compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tblName, 
CompactionType.MAJOR, true, "p=p1/q=q1");
+    // Clean up resources
+    CompactorTestUtil.runCleaner(conf);
+
+    // Verify file level
+    IMetaStoreClient hmsClient = new HiveMetaStoreClient(conf);
+    Table table = hmsClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+
+    Path tablePath = new Path(table.getSd().getLocation());
+
+    // Partition lvl1
+    FileStatus[] fileStatuses = fs.listStatus(tablePath);
+    Assert.assertEquals(1, fileStatuses.length);
+    String partitionName1 = fileStatuses[0].getPath().getName();
+    Assert.assertEquals("p=p1", partitionName1);
+
+    // Partition lvl2
+    fileStatuses = fs.listStatus(new Path(table.getSd().getLocation() + "/" + 
partitionName1));
+    Assert.assertEquals(1, fileStatuses.length);
+    String partitionName2 = fileStatuses[0].getPath().getName();
+    Assert.assertEquals("q=q1", partitionName2);
+
+    // 1 base should be here
+    fileStatuses = fs.listStatus(new Path(table.getSd().getLocation() + "/" + 
partitionName1 + "/" + partitionName2));
+    Assert.assertEquals(1, fileStatuses.length);
+    String baseName = fileStatuses[0].getPath().getName();
+    Assert.assertEquals("base_10000000_v0000009", baseName);
+  }
+
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index dff8b87aa2..921c7ed875 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -28,10 +28,12 @@ import java.io.InputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -1480,6 +1482,54 @@ public class AcidUtils {
     return validTxnList;
   }
 
+
+  /**
+   * In case of the cleaner, we don't need to go into file level, it is enough 
to collect base/delta/deletedelta directories.
+   *
+   * @param fs the filesystem used for the directory lookup
+   * @param path the path of the table or partition needs to be cleaned
+   * @return The listed directory snapshot needs to be checked for cleaning
+   * @throws IOException on filesystem errors
+   */
+  public static Map<Path, HdfsDirSnapshot> getHdfsDirSnapshotsForCleaner(final 
FileSystem fs, final Path path)
+          throws IOException {
+    Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<>();
+    Deque<RemoteIterator<FileStatus>> stack = new ArrayDeque<>();
+    stack.push(fs.listStatusIterator(path));
+    while (!stack.isEmpty()) {
+      RemoteIterator<FileStatus> itr = stack.pop();
+      while (itr.hasNext()) {
+        FileStatus fStatus = itr.next();
+        Path fPath = fStatus.getPath();
+        if (acidHiddenFileFilter.accept(fPath)) {
+          if (baseFileFilter.accept(fPath) ||
+                  deltaFileFilter.accept(fPath) ||
+                  deleteEventDeltaDirFilter.accept(fPath)) {
+            addToSnapshoot(dirToSnapshots, fPath);
+          } else {
+            if (fStatus.isDirectory()) {
+              stack.push(fs.listStatusIterator(fPath));
+            } else {
+              // Found an original file
+              HdfsDirSnapshot hdfsDirSnapshot = addToSnapshoot(dirToSnapshots, 
fPath.getParent());
+              hdfsDirSnapshot.addFile(fStatus);
+            }
+          }
+        }
+      }
+    }
+    return dirToSnapshots;
+  }
+
+  private static HdfsDirSnapshot addToSnapshoot(Map<Path, HdfsDirSnapshot> 
dirToSnapshots, Path fPath) {
+    HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
+    if (dirSnapshot == null) {
+      dirSnapshot = new HdfsDirSnapshotImpl(fPath);
+      dirToSnapshots.put(fPath, dirSnapshot);
+    }
+    return dirSnapshot;
+  }
+
   public static Map<Path, HdfsDirSnapshot> getHdfsDirSnapshots(final 
FileSystem fs, final Path path)
       throws IOException {
     Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<>();
@@ -1489,11 +1539,7 @@ public class AcidUtils {
       Path fPath = fStatus.getPath();
       if (acidHiddenFileFilter.accept(fPath)) {
         if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) {
-          HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
-          if (dirSnapshot == null) {
-            dirSnapshot = new HdfsDirSnapshotImpl(fPath);
-            dirToSnapshots.put(fPath, dirSnapshot);
-          }
+          addToSnapshoot(dirToSnapshots, fPath);
         } else {
           Path parentDirPath = fPath.getParent();
           if (acidTempDirFilter.accept(parentDirPath)) {
@@ -1504,11 +1550,7 @@ public class AcidUtils {
               // So build the snapshot with the files inside the delta 
directory
               parentDirPath = parentDirPath.getParent();
             }
-            HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath);
-            if (dirSnapshot == null) {
-              dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath);
-              dirToSnapshots.put(parentDirPath, dirSnapshot);
-            }
+            HdfsDirSnapshot dirSnapshot = addToSnapshoot(dirToSnapshots, 
parentDirPath);
             // We're not filtering out the metadata file and acid format file,
             // as they represent parts of a valid snapshot
             // We're not using the cached values downstream, but we can 
potentially optimize more in a follow-up task
@@ -1827,11 +1869,11 @@ public class AcidUtils {
       return;
     }
     if (directory.getBase() == null || directory.getBase().getWriteId() < 
writeId
-      // If there are two competing versions of a particular write-id, one 
from the compactor and another from IOW, 
+      // If there are two competing versions of a particular write-id, one 
from the compactor and another from IOW,
       // always pick the compactor one once it is committed.
-      || directory.getBase().getWriteId() == writeId && 
+      || directory.getBase().getWriteId() == writeId &&
           isCompactedBase && 
validTxnList.isTxnValid(parsedBase.getVisibilityTxnId())) {
-      
+
       if (isValidBase(parsedBase, writeIdList, directory.getFs(), 
dirSnapshot)) {
         List<HdfsFileStatusWithId> files = null;
         if (dirSnapshot != null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 478ec03278..8ea1f778de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -420,7 +420,7 @@ public class Cleaner extends MetaStoreCompactorThread {
     FileSystem fs = path.getFileSystem(conf);
     
     // Collect all of the files/dirs
-    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
AcidUtils.getHdfsDirSnapshots(fs, path);
+    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
     AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, 
Ref.from(false), false, 
         dirSnapshots);
     Table table = resolveTable(ci);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 629491e8e5..3ff8cd6256 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -67,7 +67,7 @@ final class MajorQueryCompactor extends QueryCompactor {
   @Override
   protected void commitCompaction(String dest, String tmpTableName, HiveConf 
conf,
       ValidWriteIdList actualWriteIds, long compactorTxnId) throws 
IOException, HiveException {
-    Util.cleanupEmptyDir(conf, tmpTableName);
+    // We don't need to delete the empty directory, as empty base is a valid 
scenario.
   }
 
   /**
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 5bafaf4a49..a7eb735c13 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -34,7 +34,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -97,6 +99,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests 
{
     NONACIDORCTBL("nonAcidOrcTbl"),
     NONACIDPART("nonAcidPart", "p"),
     NONACIDPART2("nonAcidPart2", "p2"),
+    NONACIDNESTEDPART("nonAcidNestedPart", "p,q"),
     ACIDNESTEDPART("acidNestedPart", "p,q"),
     MMTBL("mmTbl");
 
@@ -137,6 +140,9 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
     runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) 
partitioned by (p string) stored as orc TBLPROPERTIES 
('transactional'='false')");
     runStatementOnDriver("create table " + Table.NONACIDPART2 +
       "(a2 int, b2 int) partitioned by (p2 string) stored as orc TBLPROPERTIES 
('transactional'='false')");
+    runStatementOnDriver("create table " + Table.NONACIDNESTEDPART +
+      "(a int, b int) partitioned by (p string, q string) clustered by (a) 
into " + BUCKET_COUNT +
+      " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
     runStatementOnDriver("create table " + Table.ACIDNESTEDPART +
       "(a int, b int) partitioned by (p int, q int) clustered by (a) into " + 
BUCKET_COUNT +
       " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
@@ -689,9 +695,9 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
     boolean sawNewBase = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("base_.*")) {
-        //should be base_-9223372036854775808_v0000021 but 21 is a txn id not 
write id so it makes
+        //should be base_-9223372036854775808_v0000023 but 23 is a txn id not 
write id so it makes
         //the tests fragile
-        
Assert.assertTrue(status[i].getPath().getName().startsWith("base_-9223372036854775808_v0000021"));
+        
Assert.assertTrue(status[i].getPath().getName().startsWith("base_-9223372036854775808_v0000023"));
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
         Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
@@ -743,7 +749,7 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
           Assert.assertEquals("bucket_00001_0", 
buckets[0].getPath().getName());
         }
       } else if (status[i].getPath().getName().matches("base_.*")) {
-        Assert.assertTrue("base_-9223372036854775808", 
status[i].getPath().getName().startsWith("base_-9223372036854775808_v0000021"));//_v0000021
+        Assert.assertTrue("base_-9223372036854775808", 
status[i].getPath().getName().startsWith("base_-9223372036854775808_v0000023"));//_v0000023
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
         Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
@@ -780,12 +786,12 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
         Arrays.sort(buckets);
         if (numBase == 1) {
-          Assert.assertEquals("base_-9223372036854775808_v0000021", 
status[i].getPath().getName());
+          Assert.assertEquals("base_-9223372036854775808_v0000023", 
status[i].getPath().getName());
           Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
           Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         } else if (numBase == 2) {
           // The new base dir now has two bucket files, since the delta dir 
has two bucket files
-          Assert.assertEquals("base_10000003_v0000029", 
status[i].getPath().getName());
+          Assert.assertEquals("base_10000003_v0000031", 
status[i].getPath().getName());
           Assert.assertEquals(2, buckets.length);
           Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
         }
@@ -812,7 +818,7 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
     status = fs.listStatus(new Path(getWarehouseDir() + "/" +
       (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
     Assert.assertEquals(1, status.length);
-    Assert.assertEquals("base_10000003_v0000029", 
status[0].getPath().getName());
+    Assert.assertEquals("base_10000003_v0000031", 
status[0].getPath().getName());
     FileStatus[] buckets = fs.listStatus(status[0].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
     Arrays.sort(buckets);
     Assert.assertEquals(2, buckets.length);
@@ -825,6 +831,210 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
   }
 
+  /**
+   * Test the query correctness and directory layout for ACID table conversion
+   * 1. Insert a row to Non-ACID table
+   * 2. Convert Non-ACID to ACID table
+   * 3. Perform Major compaction
+   * 4. Insert a new row to ACID table
+   * 5. Perform another Major compaction
+   * 6. Clean
+   * @throws Exception
+   */
+  @Test
+  public void testNonAcidToAcidConversion4() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert a row to Non-ACID nested partitioned table
+    int[][] targetVals = {{1,2}};
+    runStatementOnDriver("insert into " + Table.NONACIDNESTEDPART + " 
partition(p='p1',q='q1') " + makeValuesClause(targetVals));
+    status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+
+    // There should be 2 original bucket files in the location (000000_0 and 
000001_0)
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    List<String> rs = runStatementOnDriver("select a,b from " + 
Table.NONACIDNESTEDPART);
+    Assert.assertEquals(stringifyValues(targetVals), rs);
+    rs = runStatementOnDriver("select count(*) from " + 
Table.NONACIDNESTEDPART);
+    int resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 2. Convert NONACIDNESTEDPART to ACID table
+    runStatementOnDriver("alter table " + Table.NONACIDNESTEDPART + " SET 
TBLPROPERTIES ('transactional'='true')");
+    status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+    // Everything should be same as before
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART);
+    Assert.assertEquals(stringifyValues(targetVals), rs);
+    rs = runStatementOnDriver("select count(*) from " + 
Table.NONACIDNESTEDPART);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 3. Perform a major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDNESTEDPART + " 
partition(p='p1',q='q1') compact 'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new directory: base_-9223372036854775808
+    // Original bucket files should stay until Cleaner kicks in.
+    status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+    Assert.assertEquals(3, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      Path parent = status[i].getPath().getParent();
+      if (parent.getName().matches("base_.*")) {
+        //should be base_-9223372036854775808_v0000023 but 23 is a txn id not 
write id so it makes
+        //the tests fragile
+        
Assert.assertTrue(parent.getName().startsWith("base_-9223372036854775808_v0000023"));
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(parent, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART);
+    Assert.assertEquals(stringifyValues(targetVals), rs);
+    rs = runStatementOnDriver("select count(*) from " + 
Table.NONACIDNESTEDPART);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 4. Update the existing row, and insert another row to newly-converted 
ACID table
+    runStatementOnDriver("update " + Table.NONACIDNESTEDPART + " set b=3 where 
a=1");
+    runStatementOnDriver("insert into " + Table.NONACIDNESTEDPART + "(a,b,p,q) 
values(3,4,'p1','q1')");
+    status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+    Arrays.sort(status);  // make sure delta_0000001_0000001_0000 appears 
before delta_0000002_0000002_0000
+    // There should be 2 original bucket files (000000_0 and 000001_0), a base 
directory,
+    // plus two new delta directories and one delete_delta directory that 
would be created due to
+    // the update statement (remember split-update U=D+I)!
+    Assert.assertEquals(6, status.length);
+    int numDelta = 0;
+    int numDeleteDelta = 0;
+    sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      Path parent = status[i].getPath().getParent();
+      if (parent.getName().matches("delta_.*")) {
+        numDelta++;
+        FileStatus[] buckets = fs.listStatus(parent, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numDelta == 1) {
+          Assert.assertEquals("delta_10000002_10000002_0000", 
parent.getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001_0", 
buckets[0].getPath().getName());
+        } else if (numDelta == 2) {
+          Assert.assertEquals("delta_10000003_10000003_0000", 
parent.getName());
+          Assert.assertEquals(1, buckets.length);
+          Assert.assertEquals("bucket_00000_0", 
buckets[0].getPath().getName());
+        }
+      } else if (parent.getName().matches("delete_delta_.*")) {
+        numDeleteDelta++;
+        FileStatus[] buckets = fs.listStatus(parent, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numDeleteDelta == 1) {
+          Assert.assertEquals("delete_delta_10000002_10000002_0000", 
parent.getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001_0", 
buckets[0].getPath().getName());
+        }
+      } else if (parent.getName().matches("base_.*")) {
+        Assert.assertTrue("base_-9223372036854775808", 
parent.getName().startsWith("base_-9223372036854775808_v0000023"));//_v0000023
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(parent, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+      }
+    }
+    Assert.assertEquals(2, numDelta);
+    Assert.assertEquals(1, numDeleteDelta);
+    Assert.assertTrue(sawNewBase);
+
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART);
+    targetVals = new int[][] {{1, 3}, {3, 4}};
+    Assert.assertEquals(stringifyValues(targetVals), rs);
+    rs = runStatementOnDriver("select count(*) from " + 
Table.NONACIDNESTEDPART);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 5. Perform another major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDNESTEDPART + " 
partition(p='p1',q='q1') compact 'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new base directory: base_0000001
+    // Original bucket files, delta directories, delete_delta directories and 
the
+    // previous base directory should stay until Cleaner kicks in.
+    status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+    Arrays.sort(status);
+    Assert.assertEquals(8, status.length);
+    int numBase = 0;
+    Set<Path> bases = new HashSet<>();
+    for (int i = 0; i < status.length; i++) {
+      Path parent = status[i].getPath().getParent();
+      if (parent.getName().matches("base_.*")) {
+        numBase++;
+        bases.add(parent);
+        FileStatus[] buckets = fs.listStatus(parent, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numBase == 1) {
+          Assert.assertEquals("base_-9223372036854775808_v0000023", 
parent.getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        } else if (numBase == 2) {
+          // The new base dir now has two bucket files, since the delta dir 
has two bucket files
+          Assert.assertEquals("base_10000003_v0000031", parent.getName());
+          Assert.assertEquals(2, buckets.length);
+          Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+        }
+      }
+    }
+    Assert.assertEquals(2,  bases.size());
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART);
+    targetVals = new int[][] {{3, 4}, {1, 3}};
+    Assert.assertEquals(stringifyValuesNoSort(targetVals), rs);
+    rs = runStatementOnDriver("select count(*) from " + 
Table.NONACIDNESTEDPART);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 6. Let Cleaner delete obsolete files/dirs
+    status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+    // Before Cleaner, there should be 8 items:
+    // 2 original files, 2 delta directories (1 files each), 1 delete_delta 
directory (1 file) and 2 base directories (with one and two files respectively)
+
+    Assert.assertEquals(8, status.length);
+    runCleaner(hiveConf);
+    runCleaner(hiveConf);
+    // There should be only 1 directory left: base_0000001.
+    // Original bucket files, delta directories and previous base directory 
should have been cleaned up. Only one base with 2 files.
+    status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+    Assert.assertEquals(2, status.length);
+    Assert.assertEquals("base_10000003_v0000031", 
status[0].getPath().getParent().getName());
+    FileStatus[] buckets = fs.listStatus(status[0].getPath().getParent(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+    Arrays.sort(buckets);
+    Assert.assertEquals(2, buckets.length);
+    Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART);
+    targetVals = new int[][] {{3, 4}, {1, 3}};
+    Assert.assertEquals(stringifyValuesNoSort(targetVals), rs);
+    rs = runStatementOnDriver("select count(*) from " + 
Table.NONACIDNESTEDPART);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+  }
+
+  private FileStatus[] listFilesByTable(FileSystem fs, Table t) throws 
IOException {
+    List<FileStatus> tmp = new ArrayList<>();
+    RemoteIterator<LocatedFileStatus> f = fs.listFiles(new 
Path(getWarehouseDir() + "/" +
+            t.toString().toLowerCase()), true);
+
+    while (f.hasNext()) {
+      LocatedFileStatus file = f.next();
+      if (FileUtils.HIDDEN_FILES_PATH_FILTER.accept(file.getPath())) {
+        tmp.add(file);
+      }
+    }
+    return tmp.toArray(new FileStatus[0]);
+  }
+
   @Test
   public void testValidTxnsBookkeeping() throws Exception {
     // 1. Run a query against a non-ACID table, and we shouldn't have txn 
logged in conf
@@ -1295,8 +1505,8 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
     FileStatus[] status = fs.listStatus(new Path(getWarehouseDir() + "/" + 
tblName.toLowerCase()),
         FileUtils.HIDDEN_FILES_PATH_FILTER);
     Set<String> expectedDeltas = new HashSet<>();
-    expectedDeltas.add("delete_delta_0000001_0000002_v0000019");
-    expectedDeltas.add("delta_0000001_0000002_v0000019");
+    expectedDeltas.add("delete_delta_0000001_0000002_v0000021");
+    expectedDeltas.add("delta_0000001_0000002_v0000021");
     expectedDeltas.add("delete_delta_0000003_0000003_0000");
     Set<String> actualDeltas = new HashSet<>();
     for(FileStatus file : status) {
@@ -1314,8 +1524,8 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
     status = fs.listStatus(new Path(getWarehouseDir() + "/" + 
tblName.toLowerCase()),
         FileUtils.HIDDEN_FILES_PATH_FILTER);
     expectedDeltas = new HashSet<>();
-    expectedDeltas.add("delete_delta_0000001_0000004_v0000023");
-    expectedDeltas.add("delta_0000001_0000004_v0000023");
+    expectedDeltas.add("delete_delta_0000001_0000004_v0000025");
+    expectedDeltas.add("delta_0000001_0000004_v0000025");
     actualDeltas = new HashSet<>();
     for(FileStatus file : status) {
       actualDeltas.add(file.getPath().getName());

Reply via email to