Repository: hive Updated Branches: refs/heads/master 1223031b9 -> f9efd84f8
HIVE-18589 java.io.IOException: Not enough history available (Eugene Koifman reviewed by Gopal V) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f9efd84f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f9efd84f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f9efd84f Branch: refs/heads/master Commit: f9efd84f8b3333f542fa0e435bc1466151c07de2 Parents: 1223031 Author: Eugene Koifman <[email protected]> Authored: Fri Feb 2 15:40:51 2018 -0800 Committer: Eugene Koifman <[email protected]> Committed: Fri Feb 2 15:40:51 2018 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 34 +++--- .../hive/ql/txn/compactor/CompactorMR.java | 9 ++ .../apache/hadoop/hive/ql/TestTxnCommands2.java | 90 +++++++-------- .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 109 ------------------- .../hive/ql/txn/compactor/TestWorker.java | 11 +- 5 files changed, 79 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f9efd84f/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index eb75308..430e0fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1067,13 +1067,21 @@ public class AcidUtils { * snapshot for this reader. * Note that such base is NOT obsolete. Obsolete files are those that are "covered" by other * files within the snapshot. + * A base produced by Insert Overwrite is different. Logically it's a delta file but one that + * causes anything written previously is ignored (hence the overwrite). In this case, base_x + * is visible if txnid:x is committed for current reader. */ - private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { + private static boolean isValidBase(long baseTxnId, ValidTxnList txnList, Path baseDir, + FileSystem fs) throws IOException { if(baseTxnId == Long.MIN_VALUE) { //such base is created by 1st compaction in case of non-acid to acid table conversion //By definition there are no open txns with id < 1. return true; } + if(!MetaDataFile.isCompacted(baseDir, fs)) { + //this is the IOW case + return txnList.isTxnValid(baseTxnId); + } return txnList.isValidBase(baseTxnId); } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, @@ -1091,12 +1099,12 @@ public class AcidUtils { bestBase.oldestBaseTxnId = txn; } if (bestBase.status == null) { - if(isValidBase(txn, txnList)) { + if(isValidBase(txn, txnList, p, fs)) { bestBase.status = child; bestBase.txn = txn; } } else if (bestBase.txn < txn) { - if(isValidBase(txn, txnList)) { + if(isValidBase(txn, txnList, p, fs)) { obsolete.add(bestBase.status); bestBase.status = child; bestBase.txn = txn; @@ -1484,6 +1492,8 @@ public class AcidUtils { } /** + * General facility to place a metadta file into a dir created by acid/compactor write. + * * Load Data commands against Acid tables write {@link AcidBaseFileType#ORIGINAL_BASE} type files * into delta_x_x/ (or base_x in case there is Overwrite clause). {@link MetaDataFile} is a * small JSON file in this directory that indicates that these files don't have Acid metadata @@ -1499,17 +1509,14 @@ public class AcidUtils { String DATA_FORMAT = "dataFormat"; } private interface Value { - //plain ORC file - String RAW = "raw"; - //result of acid write, i.e. decorated with ROW__ID info - String NATIVE = "native"; + //written by Major compaction + String COMPACTED = "compacted"; } /** * @param baseOrDeltaDir detla or base dir, must exist */ - public static void createMetaFile(Path baseOrDeltaDir, FileSystem fs, boolean isRawFormat) - throws IOException { + public static void createCompactorMarker(Path baseOrDeltaDir, FileSystem fs) throws IOException { /** * create _meta_data json file in baseOrDeltaDir * write thisFileVersion, dataFormat @@ -1519,7 +1526,7 @@ public class AcidUtils { Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE); Map<String, String> metaData = new HashMap<>(); metaData.put(Field.VERSION, CURRENT_VERSION); - metaData.put(Field.DATA_FORMAT, isRawFormat ? Value.RAW : Value.NATIVE); + metaData.put(Field.DATA_FORMAT, Value.COMPACTED); try (FSDataOutputStream strm = fs.create(formatFile, false)) { new ObjectMapper().writeValue(strm, metaData); } catch (IOException ioe) { @@ -1529,8 +1536,7 @@ public class AcidUtils { throw ioe; } } - //should be useful for import/export - public static boolean isImport(Path baseOrDeltaDir, FileSystem fs) throws IOException { + static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs) throws IOException { Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE); if(!fs.exists(formatFile)) { return false; @@ -1543,9 +1549,7 @@ public class AcidUtils { } String dataFormat = metaData.getOrDefault(Field.DATA_FORMAT, "null"); switch (dataFormat) { - case Value.NATIVE: - return false; - case Value.RAW: + case Value.COMPACTED: return true; default: throw new IllegalArgumentException("Unexpected value for " + Field.DATA_FORMAT http://git-wip-us.apache.org/repos/asf/hive/blob/f9efd84f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 9152b4e..0e456df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -933,17 +933,26 @@ public class CompactorMR { LOG.info(context.getJobID() + ": " + tmpLocation + " not found. Assuming 0 splits. Creating " + newDeltaDir); fs.mkdirs(newDeltaDir); + createCompactorMarker(conf, newDeltaDir, fs); return; } FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir //name is that we want to rename; leave it for another day for (FileStatus fileStatus : contents) { + //newPath is the base/delta dir Path newPath = new Path(finalLocation, fileStatus.getPath().getName()); fs.rename(fileStatus.getPath(), newPath); + createCompactorMarker(conf, newPath, fs); } fs.delete(tmpLocation, true); } + private void createCompactorMarker(JobConf conf, Path finalLocation, FileSystem fs) + throws IOException { + if(conf.getBoolean(IS_MAJOR, false)) { + AcidUtils.MetaDataFile.createCompactorMarker(finalLocation, fs); + } + } @Override public void abortJob(JobContext context, int status) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/f9efd84f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index b5dd4ec..048215a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -424,7 +424,7 @@ public class TestTxnCommands2 { // 1. Insert a row to Non-ACID table runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 original bucket files in the location (000000_0 and 000001_0) Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -440,7 +440,7 @@ public class TestTxnCommands2 { // 2. Convert NONACIDORCTBL to ACID table runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Everything should be same as before Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -456,7 +456,7 @@ public class TestTxnCommands2 { // 3. Insert another row to newly-converted ACID table runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. // The delta directory should also have only 1 bucket file (bucket_00001) Assert.assertEquals(3, status.length); @@ -464,7 +464,7 @@ public class TestTxnCommands2 { for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("delta_.*")) { sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, buckets.length); // only one bucket file Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); } else { @@ -485,13 +485,13 @@ public class TestTxnCommands2 { // There should be 1 new directory: base_xxxxxxx. // Original bucket files and delta directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(4, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { sawNewBase = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); } @@ -513,7 +513,7 @@ public class TestTxnCommands2 { fs.create(new Path(fakeFile0)); fs.create(new Path(fakeFile1)); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Before Cleaner, there should be 5 items: // 2 original files, 1 original directory, 1 base directory and 1 delta directory Assert.assertEquals(5, status.length); @@ -521,10 +521,10 @@ public class TestTxnCommands2 { // There should be only 1 directory left: base_xxxxxxx. // Original bucket files and delta directory should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, status.length); Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); - FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); @@ -552,7 +552,7 @@ public class TestTxnCommands2 { // 1. Insert a row to Non-ACID table runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 original bucket files in the location (000000_0 and 000001_0) Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -568,7 +568,7 @@ public class TestTxnCommands2 { // 2. Convert NONACIDORCTBL to ACID table runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Everything should be same as before Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -584,7 +584,7 @@ public class TestTxnCommands2 { // 3. Update the existing row in newly-converted ACID table runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory // and one delete_delta directory. When split-update is enabled, an update event is split into // a combination of delete and insert, that generates the delete_delta directory. @@ -596,12 +596,12 @@ public class TestTxnCommands2 { for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("delta_.*")) { sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); } else if (status[i].getPath().getName().matches("delete_delta_.*")) { sawNewDeleteDelta = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); } else { @@ -623,13 +623,13 @@ public class TestTxnCommands2 { // There should be 1 new directory: base_0000001. // Original bucket files and delta directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(5, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { sawNewBase = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); } @@ -644,7 +644,7 @@ public class TestTxnCommands2 { // 5. Let Cleaner delete obsolete files/dirs status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Before Cleaner, there should be 5 items: // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory Assert.assertEquals(5, status.length); @@ -652,10 +652,10 @@ public class TestTxnCommands2 { // There should be only 1 directory left: base_0000001. // Original bucket files, delta directory and delete_delta should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, status.length); Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); - FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); @@ -684,7 +684,7 @@ public class TestTxnCommands2 { // 1. Insert a row to Non-ACID table runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 original bucket files in the location (000000_0 and 000001_0) Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -700,7 +700,7 @@ public class TestTxnCommands2 { // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default) runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Everything should be same as before Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -719,14 +719,14 @@ public class TestTxnCommands2 { // There should be 1 new directory: base_-9223372036854775808 // Original bucket files should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(3, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); sawNewBase = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } @@ -743,7 +743,7 @@ public class TestTxnCommands2 { runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000 // There should be 2 original bucket files (000000_0 and 000001_0), a base directory, // plus two new delta directories and one delete_delta directory that would be created due to @@ -755,7 +755,7 @@ public class TestTxnCommands2 { for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("delta_.*")) { numDelta++; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(buckets); if (numDelta == 1) { Assert.assertEquals("delta_0000024_0000024_0000", status[i].getPath().getName()); @@ -768,7 +768,7 @@ public class TestTxnCommands2 { } } else if (status[i].getPath().getName().matches("delete_delta_.*")) { numDeleteDelta++; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(buckets); if (numDeleteDelta == 1) { Assert.assertEquals("delete_delta_0000024_0000024_0000", status[i].getPath().getName()); @@ -778,7 +778,7 @@ public class TestTxnCommands2 { } else if (status[i].getPath().getName().matches("base_.*")) { Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); sawNewBase = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } else { @@ -803,14 +803,14 @@ public class TestTxnCommands2 { // Original bucket files, delta directories, delete_delta directories and the // previous base directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(status); Assert.assertEquals(7, status.length); int numBase = 0; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { numBase++; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(buckets); if (numBase == 1) { Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); @@ -834,7 +834,7 @@ public class TestTxnCommands2 { // 6. Let Cleaner delete obsolete files/dirs status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Before Cleaner, there should be 6 items: // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories Assert.assertEquals(7, status.length); @@ -842,10 +842,10 @@ public class TestTxnCommands2 { // There should be only 1 directory left: base_0000001. // Original bucket files, delta directories and previous base directory should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, status.length); Assert.assertEquals("base_0000025", status[0].getPath().getName()); - FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(buckets); Assert.assertEquals(1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); @@ -1394,13 +1394,13 @@ public class TestTxnCommands2 { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status; status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tblName.toString().toLowerCase()), - FileUtils.STAGING_DIR_PATH_FILTER); + FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { sawNewBase = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(numBuckets, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000")); } @@ -1803,7 +1803,7 @@ public class TestTxnCommands2 { runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 delta dirs in the location Assert.assertEquals(2, status.length); for (int i = 0; i < status.length; i++) { @@ -1816,7 +1816,7 @@ public class TestTxnCommands2 { // Insert overwrite ACID table from source table runStatementOnDriver("insert overwrite table " + Table.ACIDTBL + " select a,b from " + Table.NONACIDORCTBL); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 delta dirs, plus a base dir in the location Assert.assertEquals(3, status.length); boolean sawBase = false; @@ -1844,7 +1844,7 @@ public class TestTxnCommands2 { runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 delta dirs, plus a base dir in the location Assert.assertEquals(3, status.length); sawBase = false; @@ -1870,7 +1870,7 @@ public class TestTxnCommands2 { // There should be only 1 directory left: base_xxxxxxx. // The delta dirs should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, status.length); Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); Assert.assertEquals(baseDir, status[0].getPath().getName()); @@ -1892,7 +1892,7 @@ public class TestTxnCommands2 { runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 delta dirs in the location Assert.assertEquals(2, status.length); for (int i = 0; i < status.length; i++) { @@ -1903,7 +1903,7 @@ public class TestTxnCommands2 { runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 delta dirs, plus a base dir in the location Assert.assertEquals(3, status.length); boolean sawBase = false; @@ -1930,7 +1930,7 @@ public class TestTxnCommands2 { // Insert overwrite ACID table from source table runStatementOnDriver("insert overwrite table " + Table.ACIDTBL + " select a,b from " + Table.NONACIDORCTBL); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 delta dirs, plus 2 base dirs in the location Assert.assertEquals(4, status.length); int baseCount = 0; @@ -1956,7 +1956,7 @@ public class TestTxnCommands2 { runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // There should be 2 delta dirs, plus 2 base dirs in the location Assert.assertEquals(4, status.length); baseCount = 0; @@ -1981,7 +1981,7 @@ public class TestTxnCommands2 { // There should be only 1 directory left: base_xxxxxxx. // The delta dirs should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, status.length); Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); // Verify query result @@ -2030,12 +2030,12 @@ public class TestTxnCommands2 { FileSystem fs = FileSystem.get(hiveConf); // Verify the content of subdirs FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.MMTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); int sawDeltaTimes = 0; for (int i = 0; i < status.length; i++) { Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); sawDeltaTimes++; - FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, files.length); Assert.assertTrue(files[0].getPath().getName().equals("000000_0")); } http://git-wip-us.apache.org/repos/asf/hive/blob/f9efd84f/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 26a96a4..8945fdf 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -256,115 +256,6 @@ public class TestAcidUtils { } @Test - public void testBestBase() throws Exception { - Configuration conf = new Configuration(); - MockFileSystem fs = new MockFileSystem(conf, - new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/delta_98_100/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/delta_120_130/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0])); - Path part = new MockPath(fs, "/tbl/part1"); - AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:" + Long.MAX_VALUE + ":")); - assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString()); - assertEquals(1, dir.getCurrentDirectories().size()); - assertEquals("mock:/tbl/part1/delta_120_130", - dir.getCurrentDirectories().get(0).getPath().toString()); - List<FileStatus> obsoletes = dir.getObsolete(); - assertEquals(4, obsoletes.size()); - assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/base_25", obsoletes.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/base_5", obsoletes.get(2).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString()); - assertEquals(0, dir.getOriginalFiles().size()); - - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:" + Long.MAX_VALUE + ":")); - assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); - assertEquals(0, dir.getCurrentDirectories().size()); - obsoletes = dir.getObsolete(); - assertEquals(1, obsoletes.size()); - assertEquals("mock:/tbl/part1/base_5", obsoletes.get(0).getPath().toString()); - assertEquals(0, dir.getOriginalFiles().size()); - - /*Single statemnt txns only: since we don't compact a txn range that includes an open txn, - the existence of delta_120_130 implies that 121 in the exception list is aborted unless - delta_120_130 is from streaming ingest in which case 121 can be open - (and thus 122-130 are open too) - 99 here would be Aborted since 121 is minOpenTxn, base_100 is still good - For multi-statment txns, see HIVE-13369*/ - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121:99:121")); - assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString()); - assertEquals(1, dir.getCurrentDirectories().size()); - assertEquals("mock:/tbl/part1/delta_120_130", - dir.getCurrentDirectories().get(0).getPath().toString()); - obsoletes = dir.getObsolete(); - assertEquals(4, obsoletes.size()); - assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/base_25", obsoletes.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/base_5", obsoletes.get(2).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString()); - - boolean gotException = false; - try { - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5:5")); - } - catch(IOException e) { - gotException = true; - Assert.assertEquals("Not enough history available for (125,5). Oldest available base: " + - "mock:/tbl/part1/base_5", e.getMessage()); - } - Assert.assertTrue("Expected exception", gotException); - - fs = new MockFileSystem(conf, - new MockFile("mock:/tbl/part1/delta_1_10/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/delta_12_25/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0])); - part = new MockPath(fs, "/tbl/part1"); - try { - gotException = false; - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7:7")); - } - catch(IOException e) { - gotException = true; - Assert.assertEquals("Not enough history available for (150,7). Oldest available base: " + - "mock:/tbl/part1/base_25", e.getMessage()); - } - Assert.assertTrue("Expected exception", gotException); - - fs = new MockFileSystem(conf, - new MockFile("mock:/tbl/part1/delta_2_10/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0])); - part = new MockPath(fs, "/tbl/part1"); - try { - gotException = false; - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7:7")); - } - catch(IOException e) { - gotException = true; - Assert.assertEquals("Not enough history available for (150,7). Oldest available base: " + - "mock:/tbl/part1/base_25", e.getMessage()); - } - Assert.assertTrue("Expected exception", gotException); - - fs = new MockFileSystem(conf, - //non-acid to acid table conversion - new MockFile("mock:/tbl/part1/base_" + Long.MIN_VALUE + "/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0])); - part = new MockPath(fs, "/tbl/part1"); - //note that we don't include current txn of the client in exception list to read-you-writes - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:" + Long.MAX_VALUE + ":")); - assertEquals("mock:/tbl/part1/base_" + Long.MIN_VALUE, dir.getBaseDirectory().toString()); - assertEquals(1, dir.getCurrentDirectories().size()); - assertEquals("mock:/tbl/part1/delta_1_1", dir.getCurrentDirectories().get(0).getPath().toString()); - assertEquals(0, dir.getObsolete().size()); - } - @Test public void testObsoleteOriginals() throws Exception { Configuration conf = new Configuration(); MockFileSystem fs = new MockFileSystem(conf, http://git-wip-us.apache.org/repos/asf/hive/blob/f9efd84f/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 43b28d3..0638126 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionRequest; @@ -543,7 +544,7 @@ public class TestWorker extends CompactorTest { for (int i = 0; i < stat.length; i++) { if (stat[i].getPath().getName().equals("base_0000024")) { sawNewBase = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); @@ -712,7 +713,7 @@ public class TestWorker extends CompactorTest { for (int i = 0; i < stat.length; i++) { if (stat[i].getPath().getName().equals("base_0000024")) { sawNewBase = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); @@ -755,7 +756,7 @@ public class TestWorker extends CompactorTest { for (int i = 0; i < stat.length; i++) { if (stat[i].getPath().getName().equals("base_0000004")) { sawNewBase = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); @@ -799,7 +800,7 @@ public class TestWorker extends CompactorTest { for (int i = 0; i < stat.length; i++) { if (stat[i].getPath().getName().equals("base_0000024")) { sawNewBase = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); @@ -887,7 +888,7 @@ public class TestWorker extends CompactorTest { for (int i = 0; i < stat.length; i++) { if (stat[i].getPath().getName().equals("base_0000026")) { sawNewBase = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
