This is an automated email from the ASF dual-hosted git repository.
asinkovits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 93c186e675 HIVE-25492: Major query-based compaction is skipped if
partition is empty (Antal Sinkovits, reviewed by Denys Kuzmenko)
93c186e675 is described below
commit 93c186e675bdeecd89322f58f15f9c62d4935ed3
Author: Antal Sinkovits <[email protected]>
AuthorDate: Mon Apr 11 12:12:54 2022 +0200
HIVE-25492: Major query-based compaction is skipped if partition is empty
(Antal Sinkovits, reviewed by Denys Kuzmenko)
Closes #3157
---
.../ql/txn/compactor/TestCrudCompactorOnTez.java | 97 ++++++++-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 68 ++++--
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 2 +-
.../hive/ql/txn/compactor/MajorQueryCompactor.java | 2 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 230 ++++++++++++++++++++-
5 files changed, 372 insertions(+), 27 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 8e75a793a0..f409ec2efe 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -28,9 +28,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -48,7 +51,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.TxnCommandsBaseForTests;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook;
@@ -73,7 +75,6 @@ import
org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
import org.mockito.internal.util.reflection.FieldSetter;
import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runWorker;
@@ -2209,4 +2210,96 @@ public class TestCrudCompactorOnTez extends
CompactorOnTezTest {
qc.runCompactionQueries(hiveConf, null, sdMock, null, ciMock, null,
emptyQueries, emptyQueries, emptyQueries);
Assert.assertEquals("none",
hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT));
}
+
+ @Test
+ public void testIfEmptyBaseIsPresentAfterCompaction() throws Exception {
+ String dbName = "default";
+ String tblName = "empty_table";
+
+ // Setup of LOAD INPATH scenario.
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("create table " + tblName + " (a string) stored
as orc " +
+ "TBLPROPERTIES ('transactional'='true')", driver);
+ executeStatementOnDriver("insert into " + tblName + " values ('a')",
driver);
+ executeStatementOnDriver("delete from " + tblName + " where a='a'",
driver);
+
+ // Run a query-based MAJOR compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tblName,
CompactionType.MAJOR, true);
+ // Clean up resources
+ CompactorTestUtil.runCleaner(conf);
+
+ IMetaStoreClient hmsClient = new HiveMetaStoreClient(conf);
+ Table table = hmsClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+
+ FileStatus[] fileStatuses = fs.listStatus(new
Path(table.getSd().getLocation()));
+ // There should be only dir
+ Assert.assertEquals(1, fileStatuses.length);
+ Path basePath = fileStatuses[0].getPath();
+ // And it's a base
+ Assert.assertTrue(AcidUtils.baseFileFilter.accept(basePath));
+ RemoteIterator<LocatedFileStatus> filesInBase = fs.listFiles(basePath,
true);
+ // It has no files in it
+ Assert.assertFalse(filesInBase.hasNext());
+ }
+
+ @Test
+ public void testNonAcidToAcidConversionWithNestedTableWithUnionSubdir()
throws Exception {
+ String dbName = "default";
+
+ // Helper table for the union all insert
+ String helperTblName = "helper_table";
+ executeStatementOnDriver("drop table if exists " + helperTblName, driver);
+ executeStatementOnDriver("create table " + helperTblName + " (a int, b
int) stored as orc " +
+ "TBLPROPERTIES ('transactional'='false')", driver);
+ executeStatementOnDriver("insert into " + helperTblName + " values (1, 1),
(2, 2)", driver);
+
+ // Non acid nested table with union subdirs
+ String tblName = "non_acid_nested";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("create table " + tblName +
+ "(a int, b int) partitioned by (p string, q string) stored as orc
TBLPROPERTIES ('transactional'='false')", driver);
+
+ // Insert some union data
+ executeStatementOnDriver("insert into " + tblName + "
partition(p='p1',q='q1') " +
+ "select a,b from " + helperTblName + " union all select a,b from "
+ helperTblName, driver);
+
+ // Some sanity checks
+ List<String> result = execSelectAndDumpData("select * from " + tblName,
driver, tblName);
+ Assert.assertEquals(4, result.size());
+
+ // Convert the table to acid
+ executeStatementOnDriver("alter table " + tblName + " SET TBLPROPERTIES
('transactional'='true')", driver);
+
+ // Run a query-based MAJOR compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tblName,
CompactionType.MAJOR, true, "p=p1/q=q1");
+ // Clean up resources
+ CompactorTestUtil.runCleaner(conf);
+
+ // Verify file level
+ IMetaStoreClient hmsClient = new HiveMetaStoreClient(conf);
+ Table table = hmsClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+
+ Path tablePath = new Path(table.getSd().getLocation());
+
+ // Partition lvl1
+ FileStatus[] fileStatuses = fs.listStatus(tablePath);
+ Assert.assertEquals(1, fileStatuses.length);
+ String partitionName1 = fileStatuses[0].getPath().getName();
+ Assert.assertEquals("p=p1", partitionName1);
+
+ // Partition lvl2
+ fileStatuses = fs.listStatus(new Path(table.getSd().getLocation() + "/" +
partitionName1));
+ Assert.assertEquals(1, fileStatuses.length);
+ String partitionName2 = fileStatuses[0].getPath().getName();
+ Assert.assertEquals("q=q1", partitionName2);
+
+ // 1 base should be here
+ fileStatuses = fs.listStatus(new Path(table.getSd().getLocation() + "/" +
partitionName1 + "/" + partitionName2));
+ Assert.assertEquals(1, fileStatuses.length);
+ String baseName = fileStatuses[0].getPath().getName();
+ Assert.assertEquals("base_10000000_v0000009", baseName);
+ }
+
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index dff8b87aa2..921c7ed875 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -28,10 +28,12 @@ import java.io.InputStream;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -1480,6 +1482,54 @@ public class AcidUtils {
return validTxnList;
}
+
+ /**
+ * In case of the cleaner, we don't need to go into file level, it is enough
to collect base/delta/deletedelta directories.
+ *
+ * @param fs the filesystem used for the directory lookup
+ * @param path the path of the table or partition needs to be cleaned
+ * @return The listed directory snapshot needs to be checked for cleaning
+ * @throws IOException on filesystem errors
+ */
+ public static Map<Path, HdfsDirSnapshot> getHdfsDirSnapshotsForCleaner(final
FileSystem fs, final Path path)
+ throws IOException {
+ Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<>();
+ Deque<RemoteIterator<FileStatus>> stack = new ArrayDeque<>();
+ stack.push(fs.listStatusIterator(path));
+ while (!stack.isEmpty()) {
+ RemoteIterator<FileStatus> itr = stack.pop();
+ while (itr.hasNext()) {
+ FileStatus fStatus = itr.next();
+ Path fPath = fStatus.getPath();
+ if (acidHiddenFileFilter.accept(fPath)) {
+ if (baseFileFilter.accept(fPath) ||
+ deltaFileFilter.accept(fPath) ||
+ deleteEventDeltaDirFilter.accept(fPath)) {
+ addToSnapshoot(dirToSnapshots, fPath);
+ } else {
+ if (fStatus.isDirectory()) {
+ stack.push(fs.listStatusIterator(fPath));
+ } else {
+ // Found an original file
+ HdfsDirSnapshot hdfsDirSnapshot = addToSnapshoot(dirToSnapshots,
fPath.getParent());
+ hdfsDirSnapshot.addFile(fStatus);
+ }
+ }
+ }
+ }
+ }
+ return dirToSnapshots;
+ }
+
+ private static HdfsDirSnapshot addToSnapshoot(Map<Path, HdfsDirSnapshot>
dirToSnapshots, Path fPath) {
+ HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
+ if (dirSnapshot == null) {
+ dirSnapshot = new HdfsDirSnapshotImpl(fPath);
+ dirToSnapshots.put(fPath, dirSnapshot);
+ }
+ return dirSnapshot;
+ }
+
public static Map<Path, HdfsDirSnapshot> getHdfsDirSnapshots(final
FileSystem fs, final Path path)
throws IOException {
Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<>();
@@ -1489,11 +1539,7 @@ public class AcidUtils {
Path fPath = fStatus.getPath();
if (acidHiddenFileFilter.accept(fPath)) {
if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) {
- HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
- if (dirSnapshot == null) {
- dirSnapshot = new HdfsDirSnapshotImpl(fPath);
- dirToSnapshots.put(fPath, dirSnapshot);
- }
+ addToSnapshoot(dirToSnapshots, fPath);
} else {
Path parentDirPath = fPath.getParent();
if (acidTempDirFilter.accept(parentDirPath)) {
@@ -1504,11 +1550,7 @@ public class AcidUtils {
// So build the snapshot with the files inside the delta
directory
parentDirPath = parentDirPath.getParent();
}
- HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath);
- if (dirSnapshot == null) {
- dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath);
- dirToSnapshots.put(parentDirPath, dirSnapshot);
- }
+ HdfsDirSnapshot dirSnapshot = addToSnapshoot(dirToSnapshots,
parentDirPath);
// We're not filtering out the metadata file and acid format file,
// as they represent parts of a valid snapshot
// We're not using the cached values downstream, but we can
potentially optimize more in a follow-up task
@@ -1827,11 +1869,11 @@ public class AcidUtils {
return;
}
if (directory.getBase() == null || directory.getBase().getWriteId() <
writeId
- // If there are two competing versions of a particular write-id, one
from the compactor and another from IOW,
+ // If there are two competing versions of a particular write-id, one
from the compactor and another from IOW,
// always pick the compactor one once it is committed.
- || directory.getBase().getWriteId() == writeId &&
+ || directory.getBase().getWriteId() == writeId &&
isCompactedBase &&
validTxnList.isTxnValid(parsedBase.getVisibilityTxnId())) {
-
+
if (isValidBase(parsedBase, writeIdList, directory.getFs(),
dirSnapshot)) {
List<HdfsFileStatusWithId> files = null;
if (dirSnapshot != null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 478ec03278..8ea1f778de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -420,7 +420,7 @@ public class Cleaner extends MetaStoreCompactorThread {
FileSystem fs = path.getFileSystem(conf);
// Collect all of the files/dirs
- Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots =
AcidUtils.getHdfsDirSnapshots(fs, path);
+ Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots =
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList,
Ref.from(false), false,
dirSnapshots);
Table table = resolveTable(ci);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 629491e8e5..3ff8cd6256 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -67,7 +67,7 @@ final class MajorQueryCompactor extends QueryCompactor {
@Override
protected void commitCompaction(String dest, String tmpTableName, HiveConf
conf,
ValidWriteIdList actualWriteIds, long compactorTxnId) throws
IOException, HiveException {
- Util.cleanupEmptyDir(conf, tmpTableName);
+ // We don't need to delete the empty directory, as empty base is a valid
scenario.
}
/**
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 5bafaf4a49..a7eb735c13 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -34,7 +34,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -97,6 +99,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests
{
NONACIDORCTBL("nonAcidOrcTbl"),
NONACIDPART("nonAcidPart", "p"),
NONACIDPART2("nonAcidPart2", "p2"),
+ NONACIDNESTEDPART("nonAcidNestedPart", "p,q"),
ACIDNESTEDPART("acidNestedPart", "p,q"),
MMTBL("mmTbl");
@@ -137,6 +140,9 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int)
partitioned by (p string) stored as orc TBLPROPERTIES
('transactional'='false')");
runStatementOnDriver("create table " + Table.NONACIDPART2 +
"(a2 int, b2 int) partitioned by (p2 string) stored as orc TBLPROPERTIES
('transactional'='false')");
+ runStatementOnDriver("create table " + Table.NONACIDNESTEDPART +
+ "(a int, b int) partitioned by (p string, q string) clustered by (a)
into " + BUCKET_COUNT +
+ " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create table " + Table.ACIDNESTEDPART +
"(a int, b int) partitioned by (p int, q int) clustered by (a) into " +
BUCKET_COUNT +
" buckets stored as orc TBLPROPERTIES ('transactional'='true')");
@@ -689,9 +695,9 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
boolean sawNewBase = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("base_.*")) {
- //should be base_-9223372036854775808_v0000021 but 21 is a txn id not
write id so it makes
+ //should be base_-9223372036854775808_v0000023 but 23 is a txn id not
write id so it makes
//the tests fragile
-
Assert.assertTrue(status[i].getPath().getName().startsWith("base_-9223372036854775808_v0000021"));
+
Assert.assertTrue(status[i].getPath().getName().startsWith("base_-9223372036854775808_v0000023"));
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
@@ -743,7 +749,7 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
Assert.assertEquals("bucket_00001_0",
buckets[0].getPath().getName());
}
} else if (status[i].getPath().getName().matches("base_.*")) {
- Assert.assertTrue("base_-9223372036854775808",
status[i].getPath().getName().startsWith("base_-9223372036854775808_v0000021"));//_v0000021
+ Assert.assertTrue("base_-9223372036854775808",
status[i].getPath().getName().startsWith("base_-9223372036854775808_v0000023"));//_v0000023
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
@@ -780,12 +786,12 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
FileStatus[] buckets = fs.listStatus(status[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numBase == 1) {
- Assert.assertEquals("base_-9223372036854775808_v0000021",
status[i].getPath().getName());
+ Assert.assertEquals("base_-9223372036854775808_v0000023",
status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numBase == 2) {
// The new base dir now has two bucket files, since the delta dir
has two bucket files
- Assert.assertEquals("base_10000003_v0000029",
status[i].getPath().getName());
+ Assert.assertEquals("base_10000003_v0000031",
status[i].getPath().getName());
Assert.assertEquals(2, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
}
@@ -812,7 +818,7 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
status = fs.listStatus(new Path(getWarehouseDir() + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, status.length);
- Assert.assertEquals("base_10000003_v0000029",
status[0].getPath().getName());
+ Assert.assertEquals("base_10000003_v0000031",
status[0].getPath().getName());
FileStatus[] buckets = fs.listStatus(status[0].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(2, buckets.length);
@@ -825,6 +831,210 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
}
+ /**
+ * Test the query correctness and directory layout for ACID table conversion
+ * 1. Insert a row to Non-ACID table
+ * 2. Convert Non-ACID to ACID table
+ * 3. Perform Major compaction
+ * 4. Insert a new row to ACID table
+ * 5. Perform another Major compaction
+ * 6. Clean
+ * @throws Exception
+ */
+ @Test
+ public void testNonAcidToAcidConversion4() throws Exception {
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status;
+
+ // 1. Insert a row to Non-ACID nested partitioned table
+ int[][] targetVals = {{1,2}};
+ runStatementOnDriver("insert into " + Table.NONACIDNESTEDPART + "
partition(p='p1',q='q1') " + makeValuesClause(targetVals));
+ status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+
+ // There should be 2 original bucket files in the location (000000_0 and
000001_0)
+ Assert.assertEquals(BUCKET_COUNT, status.length);
+ for (int i = 0; i < status.length; i++) {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ List<String> rs = runStatementOnDriver("select a,b from " +
Table.NONACIDNESTEDPART);
+ Assert.assertEquals(stringifyValues(targetVals), rs);
+ rs = runStatementOnDriver("select count(*) from " +
Table.NONACIDNESTEDPART);
+ int resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 2. Convert NONACIDNESTEDPART to ACID table
+ runStatementOnDriver("alter table " + Table.NONACIDNESTEDPART + " SET
TBLPROPERTIES ('transactional'='true')");
+ status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+ // Everything should be same as before
+ Assert.assertEquals(BUCKET_COUNT, status.length);
+ for (int i = 0; i < status.length; i++) {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART);
+ Assert.assertEquals(stringifyValues(targetVals), rs);
+ rs = runStatementOnDriver("select count(*) from " +
Table.NONACIDNESTEDPART);
+ resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 3. Perform a major compaction
+ runStatementOnDriver("alter table "+ Table.NONACIDNESTEDPART + "
partition(p='p1',q='q1') compact 'MAJOR'");
+ runWorker(hiveConf);
+ // There should be 1 new directory: base_-9223372036854775808
+ // Original bucket files should stay until Cleaner kicks in.
+ status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+ Assert.assertEquals(3, status.length);
+ boolean sawNewBase = false;
+ for (int i = 0; i < status.length; i++) {
+ Path parent = status[i].getPath().getParent();
+ if (parent.getName().matches("base_.*")) {
+ //should be base_-9223372036854775808_v0000023 but 23 is a txn id not
write id so it makes
+ //the tests fragile
+
Assert.assertTrue(parent.getName().startsWith("base_-9223372036854775808_v0000023"));
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(parent,
FileUtils.HIDDEN_FILES_PATH_FILTER);
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART);
+ Assert.assertEquals(stringifyValues(targetVals), rs);
+ rs = runStatementOnDriver("select count(*) from " +
Table.NONACIDNESTEDPART);
+ resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 4. Update the existing row, and insert another row to newly-converted
ACID table
+ runStatementOnDriver("update " + Table.NONACIDNESTEDPART + " set b=3 where
a=1");
+ runStatementOnDriver("insert into " + Table.NONACIDNESTEDPART + "(a,b,p,q)
values(3,4,'p1','q1')");
+ status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+ Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears
before delta_0000002_0000002_0000
+ // There should be 2 original bucket files (000000_0 and 000001_0), a base
directory,
+ // plus two new delta directories and one delete_delta directory that
would be created due to
+ // the update statement (remember split-update U=D+I)!
+ Assert.assertEquals(6, status.length);
+ int numDelta = 0;
+ int numDeleteDelta = 0;
+ sawNewBase = false;
+ for (int i = 0; i < status.length; i++) {
+ Path parent = status[i].getPath().getParent();
+ if (parent.getName().matches("delta_.*")) {
+ numDelta++;
+ FileStatus[] buckets = fs.listStatus(parent,
FileUtils.HIDDEN_FILES_PATH_FILTER);
+ Arrays.sort(buckets);
+ if (numDelta == 1) {
+ Assert.assertEquals("delta_10000002_10000002_0000",
parent.getName());
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001_0",
buckets[0].getPath().getName());
+ } else if (numDelta == 2) {
+ Assert.assertEquals("delta_10000003_10000003_0000",
parent.getName());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00000_0",
buckets[0].getPath().getName());
+ }
+ } else if (parent.getName().matches("delete_delta_.*")) {
+ numDeleteDelta++;
+ FileStatus[] buckets = fs.listStatus(parent,
FileUtils.HIDDEN_FILES_PATH_FILTER);
+ Arrays.sort(buckets);
+ if (numDeleteDelta == 1) {
+ Assert.assertEquals("delete_delta_10000002_10000002_0000",
parent.getName());
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001_0",
buckets[0].getPath().getName());
+ }
+ } else if (parent.getName().matches("base_.*")) {
+ Assert.assertTrue("base_-9223372036854775808",
parent.getName().startsWith("base_-9223372036854775808_v0000023"));//_v0000023
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(parent,
FileUtils.HIDDEN_FILES_PATH_FILTER);
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ }
+ }
+ Assert.assertEquals(2, numDelta);
+ Assert.assertEquals(1, numDeleteDelta);
+ Assert.assertTrue(sawNewBase);
+
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART);
+ targetVals = new int[][] {{1, 3}, {3, 4}};
+ Assert.assertEquals(stringifyValues(targetVals), rs);
+ rs = runStatementOnDriver("select count(*) from " +
Table.NONACIDNESTEDPART);
+ resultCount = 2;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 5. Perform another major compaction
+ runStatementOnDriver("alter table "+ Table.NONACIDNESTEDPART + "
partition(p='p1',q='q1') compact 'MAJOR'");
+ runWorker(hiveConf);
+ // There should be 1 new base directory: base_0000001
+ // Original bucket files, delta directories, delete_delta directories and
the
+ // previous base directory should stay until Cleaner kicks in.
+ status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+ Arrays.sort(status);
+ Assert.assertEquals(8, status.length);
+ int numBase = 0;
+ Set<Path> bases = new HashSet<>();
+ for (int i = 0; i < status.length; i++) {
+ Path parent = status[i].getPath().getParent();
+ if (parent.getName().matches("base_.*")) {
+ numBase++;
+ bases.add(parent);
+ FileStatus[] buckets = fs.listStatus(parent,
FileUtils.HIDDEN_FILES_PATH_FILTER);
+ Arrays.sort(buckets);
+ if (numBase == 1) {
+ Assert.assertEquals("base_-9223372036854775808_v0000023",
parent.getName());
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ } else if (numBase == 2) {
+ // The new base dir now has two bucket files, since the delta dir
has two bucket files
+ Assert.assertEquals("base_10000003_v0000031", parent.getName());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ }
+ }
+ }
+ Assert.assertEquals(2, bases.size());
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART);
+ targetVals = new int[][] {{3, 4}, {1, 3}};
+ Assert.assertEquals(stringifyValuesNoSort(targetVals), rs);
+ rs = runStatementOnDriver("select count(*) from " +
Table.NONACIDNESTEDPART);
+ resultCount = 2;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 6. Let Cleaner delete obsolete files/dirs
+ status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+ // Before Cleaner, there should be 8 items:
+ // 2 original files, 2 delta directories (1 files each), 1 delete_delta
directory (1 file) and 2 base directories (with one and two files respectively)
+
+ Assert.assertEquals(8, status.length);
+ runCleaner(hiveConf);
+ runCleaner(hiveConf);
+ // There should be only 1 directory left: base_0000001.
+ // Original bucket files, delta directories and previous base directory
should have been cleaned up. Only one base with 2 files.
+ status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
+ Assert.assertEquals(2, status.length);
+ Assert.assertEquals("base_10000003_v0000031",
status[0].getPath().getParent().getName());
+ FileStatus[] buckets = fs.listStatus(status[0].getPath().getParent(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
+ Arrays.sort(buckets);
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART);
+ targetVals = new int[][] {{3, 4}, {1, 3}};
+ Assert.assertEquals(stringifyValuesNoSort(targetVals), rs);
+ rs = runStatementOnDriver("select count(*) from " +
Table.NONACIDNESTEDPART);
+ resultCount = 2;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+ }
+
+ private FileStatus[] listFilesByTable(FileSystem fs, Table t) throws
IOException {
+ List<FileStatus> tmp = new ArrayList<>();
+ RemoteIterator<LocatedFileStatus> f = fs.listFiles(new
Path(getWarehouseDir() + "/" +
+ t.toString().toLowerCase()), true);
+
+ while (f.hasNext()) {
+ LocatedFileStatus file = f.next();
+ if (FileUtils.HIDDEN_FILES_PATH_FILTER.accept(file.getPath())) {
+ tmp.add(file);
+ }
+ }
+ return tmp.toArray(new FileStatus[0]);
+ }
+
@Test
public void testValidTxnsBookkeeping() throws Exception {
// 1. Run a query against a non-ACID table, and we shouldn't have txn
logged in conf
@@ -1295,8 +1505,8 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
FileStatus[] status = fs.listStatus(new Path(getWarehouseDir() + "/" +
tblName.toLowerCase()),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Set<String> expectedDeltas = new HashSet<>();
- expectedDeltas.add("delete_delta_0000001_0000002_v0000019");
- expectedDeltas.add("delta_0000001_0000002_v0000019");
+ expectedDeltas.add("delete_delta_0000001_0000002_v0000021");
+ expectedDeltas.add("delta_0000001_0000002_v0000021");
expectedDeltas.add("delete_delta_0000003_0000003_0000");
Set<String> actualDeltas = new HashSet<>();
for(FileStatus file : status) {
@@ -1314,8 +1524,8 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
status = fs.listStatus(new Path(getWarehouseDir() + "/" +
tblName.toLowerCase()),
FileUtils.HIDDEN_FILES_PATH_FILTER);
expectedDeltas = new HashSet<>();
- expectedDeltas.add("delete_delta_0000001_0000004_v0000023");
- expectedDeltas.add("delta_0000001_0000004_v0000023");
+ expectedDeltas.add("delete_delta_0000001_0000004_v0000025");
+ expectedDeltas.add("delta_0000001_0000004_v0000025");
actualDeltas = new HashSet<>();
for(FileStatus file : status) {
actualDeltas.add(file.getPath().getName());