This is an automated email from the ASF dual-hosted git repository. kuczoram pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 98c925c HIVE-23763: Query based minor compaction produces wrong files when ro… (#1327) 98c925c is described below commit 98c925c8cb52e410c9aa02a499d2ac15a6d6777b Author: kuczoram <kuczo...@cloudera.com> AuthorDate: Tue Aug 4 12:35:52 2020 +0200 HIVE-23763: Query based minor compaction produces wrong files when ro… (#1327) * HIVE-23763: Query based minor compaction produces wrong files when rows with different buckets Ids are processed by the same FileSinkOperator * HIVE-23763: Fix the MM compaction tests * HIVE-23763: Address the review comments --- .../hive/ql/txn/compactor/CompactorOnTezTest.java | 150 ++++++++++- .../hive/ql/txn/compactor/CompactorTestUtil.java | 13 +- .../ql/txn/compactor/TestCrudCompactorOnTez.java | 290 ++++++++++++++++++++- .../ql/txn/compactor/TestMmCompactorOnTez.java | 18 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 64 +++-- .../hadoop/hive/ql/exec/ReduceSinkOperator.java | 14 + .../org/apache/hadoop/hive/ql/metadata/Hive.java | 21 +- .../opconventer/HiveOpConverterUtils.java | 2 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 25 +- .../org/apache/hadoop/hive/ql/plan/PlanUtils.java | 3 +- .../apache/hadoop/hive/ql/plan/ReduceSinkDesc.java | 9 + .../hive/ql/txn/compactor/QueryCompactor.java | 12 + 12 files changed, 551 insertions(+), 70 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java index 71232de..08b9039 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java @@ -35,6 +35,8 @@ import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults; import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver; @@ -53,6 +55,7 @@ public class CompactorOnTezTest { protected IMetaStoreClient msClient; protected IDriver driver; protected boolean runsOnTez = true; + protected boolean mmCompaction = false; @Before // Note: we create a new conf and driver object before every test @@ -95,6 +98,17 @@ public class CompactorOnTezTest { conf.set("hive.tez.container.size", "128"); conf.setBoolean("hive.merge.tezfiles", false); conf.setBoolean("hive.in.tez.test", true); + if (!mmCompaction) { + // We need these settings to create a table which is not bucketed, but contains multiple files. + // If these parameters are set when inserting 100 rows into the table, the rows will + // be distributed into multiple files. This setup is used in the testMinorCompactionWithoutBuckets, + // testMinorCompactionWithoutBucketsInsertOverwrite and testMajorCompactionWithoutBucketsInsertAndDeleteInsertOverwrite + // tests in the TestCrudCompactorOnTez class. + // This use case has to be tested because of HIVE-23763. The MM compaction is not affected by this issue, + // therefore no need to set these configs for MM compaction. + conf.set("tez.grouping.max-size", "1024"); + conf.set("tez.grouping.min-size", "1"); + } } @After public void tearDown() { @@ -218,6 +232,77 @@ public class CompactorOnTezTest { } /** + * This method is for creating a non-bucketed table in which the data is distributed + * into multiple splits. The initial data is 100 rows and it should be split into + * multiple files, like bucket_000001, bucket_000002, ... + * This is needed because the MINOR compactions had issues with tables like this. (HIVE-23763) + * @param dbName + * @param tblName + * @param tempTblName + * @param createDeletes + * @param createInserts + * @param insertOverwrite + * @throws Exception + */ + void createTableWithoutBucketWithMultipleSplits(String dbName, String tblName, String tempTblName, + boolean createDeletes, boolean createInserts, boolean insertOverwrite) throws Exception { + if (dbName != null) { + tblName = dbName + "." + tblName; + tempTblName = dbName + "." + tempTblName; + } + + executeStatementOnDriver("drop table if exists " + tblName, driver); + StringBuilder query = new StringBuilder(); + query.append("create table ").append(tblName).append(" (a string, b string, c string)"); + query.append(" stored as orc"); + query.append(" TBLPROPERTIES('transactional'='true',"); + query.append(" 'transactional_properties'='default')"); + executeStatementOnDriver(query.toString(), driver); + + generateInsertsWithMultipleSplits(0, 100, tblName, tempTblName + "_1", insertOverwrite); + + if (createDeletes) { + executeStatementOnDriver("delete from " + tblName + " where a in ('41','87','53','11')", driver); + executeStatementOnDriver("delete from " + tblName + " where a in ('42','88','81','12','86')", driver); + executeStatementOnDriver("delete from " + tblName + " where a in ('98')", driver); + executeStatementOnDriver("delete from " + tblName + " where a in ('40')", driver); + } + + if (createInserts) { + generateInsertsWithMultipleSplits(100, 250, tblName, tempTblName + "_2", false); + generateInsertsWithMultipleSplits(300, 318, tblName, tempTblName + "_3", false); + generateInsertsWithMultipleSplits(400, 410, tblName, tempTblName + "_4", false); + } + } + + private void generateInsertsWithMultipleSplits(int begin, int end, String tableName, String tempTableName, + boolean insertOverwrite) throws Exception { + StringBuffer sb = new StringBuffer(); + for (int i = begin; i < end; i++) { + sb.append("('"); + sb.append(i); + sb.append("','value"); + sb.append(i); + sb.append("','this is some comment to increase the file size "); + sb.append(i); + sb.append("')"); + if (i < end - 1) { + sb.append(","); + } + } + executeStatementOnDriver("DROP TABLE IF EXISTS " + tempTableName, driver); + executeStatementOnDriver( + "CREATE EXTERNAL TABLE " + tempTableName + " (id string, value string, comment string) STORED AS TEXTFILE ", + driver); + executeStatementOnDriver("insert into " + tempTableName + " values " + sb.toString(), driver); + if (insertOverwrite) { + executeStatementOnDriver("insert overwrite table " + tableName + " select * from " + tempTableName, driver); + } else { + executeStatementOnDriver("insert into " + tableName + " select * from " + tempTableName, driver); + } + } + + /** * 5 txns. */ void insertMmTestData(String tblName) throws Exception { @@ -261,22 +346,77 @@ public class CompactorOnTezTest { } List<String> getAllData(String tblName) throws Exception { - return getAllData(null, tblName); + return getAllData(null, tblName, false); + } + + List<String> getAllData(String tblName, boolean withRowId) throws Exception { + return getAllData(null, tblName, withRowId); } - List<String> getAllData(String dbName, String tblName) throws Exception { + List<String> getAllData(String dbName, String tblName, boolean withRowId) throws Exception { if (dbName != null) { tblName = dbName + "." + tblName; } - List<String> result = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver); + StringBuffer query = new StringBuffer(); + query.append("select "); + if (withRowId) { + query.append("ROW__ID, "); + } + query.append("* from "); + query.append(tblName); + List<String> result = executeStatementOnDriverAndReturnResults(query.toString(), driver); Collections.sort(result); return result; } + List<String> getDataWithInputFileNames(String dbName, String tblName) throws Exception { + if (dbName != null) { + tblName = dbName + "." + tblName; + } + StringBuffer query = new StringBuffer(); + query.append("select "); + query.append("INPUT__FILE__NAME, a from "); + query.append(tblName); + query.append(" order by a"); + List<String> result = executeStatementOnDriverAndReturnResults(query.toString(), driver); + return result; + } + + boolean compareFileNames(List<String> expectedFileNames, List<String> actualFileNames) { + if (expectedFileNames.size() != actualFileNames.size()) { + return false; + } + + Pattern p = Pattern.compile("(.*)(bucket_[0-9]+)(_[0-9]+)?"); + for (int i = 0; i < expectedFileNames.size(); i++) { + String[] expectedParts = expectedFileNames.get(i).split("\t"); + String[] actualParts = actualFileNames.get(i).split("\t"); + + if (!expectedParts[1].equals(actualParts[1])) { + return false; + } + + String expectedFileName = null; + String actualFileName = null; + Matcher m = p.matcher(expectedParts[0]); + if (m.matches()) { + expectedFileName = m.group(2); + } + m = p.matcher(actualParts[0]); + if (m.matches()) { + actualFileName = m.group(2); + } + + if (expectedFileName == null || actualFileName == null || !expectedFileName.equals(actualFileName)) { + return false; + } + } + return true; + } + protected List<String> getBucketData(String tblName, String bucketId) throws Exception { return executeStatementOnDriverAndReturnResults( - "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order" - + " by a, b", driver); + "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID, a, b", driver); } protected void dropTable(String tblName) throws Exception { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java index 3ca5d4c..9db2229 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java @@ -95,8 +95,16 @@ class CompactorTestUtil { throws IOException { Path path = partitionName == null ? new Path(table.getSd().getLocation(), deltaName) : new Path( new Path(table.getSd().getLocation()), new Path(partitionName, deltaName)); - return Arrays.stream(fs.listStatus(path, AcidUtils.hiddenFileFilter)).map(FileStatus::getPath).map(Path::getName) - .sorted().collect(Collectors.toList()); + return Arrays.stream(fs.listStatus(path, AcidUtils.bucketFileFilter)).map(FileStatus::getPath).map(Path::getName).sorted() + .collect(Collectors.toList()); + } + + static List<String> getBucketFileNamesForMMTables(FileSystem fs, Table table, String partitionName, String deltaName) + throws IOException { + Path path = partitionName == null ? new Path(table.getSd().getLocation(), deltaName) : new Path( + new Path(table.getSd().getLocation()), new Path(partitionName, deltaName)); + return Arrays.stream(fs.listStatus(path, AcidUtils.hiddenFileFilter)).map(FileStatus::getPath).map(Path::getName).sorted() + .collect(Collectors.toList()); } /** @@ -160,6 +168,7 @@ class CompactorTestUtil { throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + e); } List<String> rs = new ArrayList<>(); + driver.setMaxRows(400); driver.getResults(rs); return rs; } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index 242e1cb..bba5278 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -21,7 +21,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileSystem; @@ -260,8 +264,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow", "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday", "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday", - "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday", "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday", "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday", "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday", "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday")); @@ -351,8 +355,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow", "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday", - "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday"); @@ -498,6 +502,261 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { } @Test + public void testMinorCompactionWithoutBuckets() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction_wobuckets_1"; + String tempTableName = "tmp_txt_table_1"; + + List<String> expectedDeltas = new ArrayList<>(); + expectedDeltas.add("delta_0000001_0000001_0000"); + expectedDeltas.add("delta_0000006_0000006_0000"); + expectedDeltas.add("delta_0000007_0000007_0000"); + expectedDeltas.add("delta_0000008_0000008_0000"); + + List<String> expectedDeleteDeltas = new ArrayList<>(); + expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000"); + expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000"); + expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000"); + expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000"); + + testMinorCompactionWithoutBucketsCommon(dbName, tableName, tempTableName, false, expectedDeltas, + expectedDeleteDeltas, "delta_0000001_0000008_v0000025", CompactionType.MINOR); + } + + @Test + public void testMinorCompactionWithoutBucketsInsertOverwrite() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction_wobuckets_2"; + String tempTableName = "tmp_txt_table_2"; + + List<String> expectedDeltas = new ArrayList<>(); + expectedDeltas.add("delta_0000006_0000006_0000"); + expectedDeltas.add("delta_0000007_0000007_0000"); + expectedDeltas.add("delta_0000008_0000008_0000"); + + List<String> expectedDeleteDeltas = new ArrayList<>(); + expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000"); + expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000"); + expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000"); + expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000"); + + testMinorCompactionWithoutBucketsCommon(dbName, tableName, tempTableName, true, expectedDeltas, + expectedDeleteDeltas, "delta_0000001_0000008_v0000025", CompactionType.MINOR); + } + + @Test + public void testMajorCompactionWithoutBucketsInsertAndDeleteInsertOverwrite() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction_wobuckets_3"; + String tempTableName = "tmp_txt_table_3"; + + List<String> expectedDeltas = new ArrayList<>(); + expectedDeltas.add("delta_0000006_0000006_0000"); + expectedDeltas.add("delta_0000007_0000007_0000"); + expectedDeltas.add("delta_0000008_0000008_0000"); + + List<String> expectedDeleteDeltas = new ArrayList<>(); + expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000"); + expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000"); + expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000"); + expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000"); + + testMinorCompactionWithoutBucketsCommon(dbName, tableName, tempTableName, true, expectedDeltas, + expectedDeleteDeltas, "base_0000008_v0000025", CompactionType.MAJOR); + } + + private void testMinorCompactionWithoutBucketsCommon(String dbName, String tableName, String tempTableName, + boolean insertOverWrite, List<String> expectedDeltas, List<String> expectedDeleteDeltas, + String expectedCompactedDeltaDirName, CompactionType compactionType) throws Exception { + + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createTableWithoutBucketWithMultipleSplits(dbName, tableName, tempTableName, true, true, + insertOverWrite); + + FileSystem fs = FileSystem.get(conf); + Table table = msClient.getTable(dbName, tableName); + + List<String> expectedData = dataProvider.getAllData(tableName); + List<String> expectedFileNames = dataProvider.getDataWithInputFileNames(null, tableName); + + // Verify deltas + Assert.assertEquals("Delta directories does not match", expectedDeltas, + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + // Verify delete delta + Assert.assertEquals("Delete directories does not match", expectedDeleteDeltas, + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null)); + + Set<String> expectedDeleteBucketFilesSet = new HashSet<>(); + for (String expectedDeleteDelta : expectedDeleteDeltas) { + expectedDeleteBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDeleteDelta)); + } + List<String> expectedDeleteBucketFiles = new ArrayList<>(expectedDeleteBucketFilesSet); + Collections.sort(expectedDeleteBucketFiles); + + Set<String> expectedBucketFilesSet = new HashSet<>(); + for (String expectedDelta : expectedDeltas) { + expectedBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDelta)); + } + List<String> expectedBucketFiles = new ArrayList<>(); + for (String expectedBucketFile : expectedBucketFilesSet) { + Pattern p = Pattern.compile("(bucket_[0-9]+)(_[0-9]+)?"); + Matcher m = p.matcher(expectedBucketFile); + if (m.matches()) { + expectedBucketFiles.add(m.group(1)); + } + } + Collections.sort(expectedBucketFiles); + + CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + + // Only 1 compaction should be in the response queue with succeeded state + List<ShowCompactResponseElement> compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + + // Verify delta and delete delta directories after compaction + if (CompactionType.MAJOR == compactionType) { + List<String> actualBasesAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null); + Assert.assertEquals("Base directory does not match after compaction", + Collections.singletonList(expectedCompactedDeltaDirName), actualBasesAfterComp); + // Verify bucket files in delta and delete delta dirs + Assert.assertEquals("Bucket names are not matching after compaction in the base folder", + expectedBucketFiles, CompactorTestUtil.getBucketFileNames(fs, table, null, actualBasesAfterComp.get(0))); + } else { + List<String> actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList(expectedCompactedDeltaDirName), actualDeltasAfterComp); + List<String> actualDeleteDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null); + Assert.assertEquals("Delete delta directories does not match after compaction", + Collections.singletonList("delete_" + expectedCompactedDeltaDirName), actualDeleteDeltasAfterComp); + // Verify bucket files in delta and delete delta dirs + Assert.assertEquals("Bucket names are not matching after compaction in the delete deltas", + expectedDeleteBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0))); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + } + + // Verify all contents + List<String> actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + List<String> actualFileNames = dataProvider.getDataWithInputFileNames(null, tableName); + Assert.assertTrue(dataProvider.compareFileNames(expectedFileNames, actualFileNames)); + dataProvider.dropTable(tableName); + } + + @Test + public void testMinorAndMajorCompactionWithoutBuckets() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction_wobuckets_5"; + String tempTableName = "tmp_txt_table_5"; + + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createTableWithoutBucketWithMultipleSplits(dbName, tableName, tempTableName, true, true, false); + + FileSystem fs = FileSystem.get(conf); + Table table = msClient.getTable(dbName, tableName); + + List<String> expectedData = dataProvider.getAllData(tableName); + // Verify deltas + List<String> expectedDeltas = new ArrayList<>(); + expectedDeltas.add("delta_0000001_0000001_0000"); + expectedDeltas.add("delta_0000006_0000006_0000"); + expectedDeltas.add("delta_0000007_0000007_0000"); + expectedDeltas.add("delta_0000008_0000008_0000"); + Assert.assertEquals("Delta directories does not match", + expectedDeltas, + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + // Verify delete delta + List<String> expectedDeleteDeltas = new ArrayList<>(); + expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000"); + expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000"); + expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000"); + expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000"); + Assert.assertEquals("Delete directories does not match", + expectedDeleteDeltas, + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null)); + + Set<String> expectedDeleteBucketFilesSet = new HashSet<>(); + for (String expectedDeleteDelta : expectedDeleteDeltas) { + expectedDeleteBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDeleteDelta)); + } + List<String> expectedDeleteBucketFiles = new ArrayList<>(expectedDeleteBucketFilesSet); + Collections.sort(expectedDeleteBucketFiles); + + Set<String> expectedBucketFilesSet = new HashSet<>(); + for (String expectedDelta : expectedDeltas) { + expectedBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDelta)); + } + List<String> expectedBucketFiles = new ArrayList<>(); + for (String expectedBucketFile : expectedBucketFilesSet) { + Pattern p = Pattern.compile("(bucket_[0-9]+)(_[0-9]+)?"); + Matcher m = p.matcher(expectedBucketFile); + if (m.matches()) { + expectedBucketFiles.add(m.group(1)); + } + } + Collections.sort(expectedBucketFiles); + + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + + // Only 1 compaction should be in the response queue with succeeded state + List<ShowCompactResponseElement> compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + // Verify delta directories after compaction + List<String> actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList("delta_0000001_0000008_v0000024"), actualDeltasAfterComp); + List<String> actualDeleteDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null); + Assert.assertEquals("Delete delta directories does not match after compaction", + Collections.singletonList("delete_delta_0000001_0000008_v0000024"), actualDeleteDeltasAfterComp); + // Verify bucket files in delta dirs + List<String> actualData = dataProvider.getAllData(tableName); + + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + + Assert.assertEquals("Bucket names are not matching after compaction", expectedDeleteBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0))); + // Verify all contents + // List<String> actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + + // Only 1 compaction should be in the response queue with succeeded state + compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + // Verify delta directories after compaction + List<String> actualBasesAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null); + Assert.assertEquals("Base directory does not match after compaction", + Collections.singletonList("base_0000008_v0000038"), actualBasesAfterComp); + // Verify bucket files in delta dirs + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualBasesAfterComp.get(0))); + // Verify all contents + actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + dataProvider.dropTable(tableName); + } + + @Test public void testMinorCompactionNotPartitionedWithBuckets() throws Exception { Assume.assumeTrue(runsOnTez); String dbName = "default"; @@ -630,6 +889,24 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { CompactorTestUtil .getBucketFileNames(fs, table, partitionToday, actualDeleteDeltasAfterCompPartToday.get(0))); + // Verify contents of bucket files. + // Bucket 0 + List<String> expectedRsBucket0 = Arrays + .asList("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3\tyesterday", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4\ttoday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday"); + List<String> rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); + Assert.assertEquals(expectedRsBucket0, rsBucket0); + // Verify all contents List<String> actualData = dataProvider.getAllData(tableName); Assert.assertEquals(expectedData, actualData); @@ -704,14 +981,13 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { List<String> rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); Assert.assertEquals(expectedRsBucket0, rsBucket0); // Bucket 1 - List<String> expectedRsBucket1 = Arrays.asList( - "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday", + List<String> expectedRsBucket1 = Arrays.asList("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday", "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4\ttoday", "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow", "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday", - "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday"); @@ -1250,7 +1526,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Insert test data into test table dataProvider.insertTestData(dbName, tableName); // Get all data before compaction is run - List<String> expectedData = dataProvider.getAllData(dbName, tableName); + List<String> expectedData = dataProvider.getAllData(dbName, tableName, false); Collections.sort(expectedData); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true); @@ -1263,7 +1539,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { + " compaction", Collections.singletonList(resultDirName), CompactorTestUtil.getBaseOrDeltaNames(fs, pathFilter, table, null)); // Verify all contents - List<String> actualData = dataProvider.getAllData(dbName, tableName); + List<String> actualData = dataProvider.getAllData(dbName, tableName, false); Assert.assertEquals(expectedData, actualData); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java index 9772bbe..dd9e06c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java @@ -49,6 +49,10 @@ import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeState */ public class TestMmCompactorOnTez extends CompactorOnTezTest { + public TestMmCompactorOnTez() { + mmCompaction = true; + } + @Test public void testMmMinorCompactionNotPartitionedWithoutBuckets() throws Exception { String dbName = "default"; String tableName = "testMmMinorCompaction"; @@ -80,7 +84,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest { // Verify bucket files in delta dirs List<String> expectedBucketFiles = Collections.singletonList("000000_0"); Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, - CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + CompactorTestUtil.getBucketFileNamesForMMTables(fs, table, null, actualDeltasAfterComp.get(0))); verifyAllContents(tableName, testDataProvider, expectedData); // Clean up testDataProvider.dropTable(tableName); @@ -123,7 +127,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest { // Verify bucket files in delta dirs List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0"); Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, - CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + CompactorTestUtil.getBucketFileNamesForMMTables(fs, table, null, actualDeltasAfterComp.get(0))); verifyAllContents(tableName, testDataProvider, expectedData); // Clean up testDataProvider.dropTable(tableName); @@ -167,7 +171,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest { List<String> expectedBucketFiles = Collections.singletonList("000000_0"); Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, CompactorTestUtil - .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0))); + .getBucketFileNamesForMMTables(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0))); verifyAllContents(tableName, dataProvider, expectedData); // Clean up dataProvider.dropTable(tableName); @@ -239,7 +243,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest { List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0"); Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, CompactorTestUtil - .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0))); + .getBucketFileNamesForMMTables(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0))); verifyAllContents(tableName, dataProvider, expectedData); // Clean up dataProvider.dropTable(tableName); @@ -280,7 +284,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest { // Verify bucket file in delta dir List<String> expectedBucketFile = Collections.singletonList("000000_0"); Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFile, - CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + CompactorTestUtil.getBucketFileNamesForMMTables(fs, table, null, actualDeltasAfterComp.get(0))); verifyAllContents(tableName, dataProvider, expectedData); // Clean up dataProvider.dropTable(tableName); @@ -553,7 +557,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest { // Insert test data into test table dataProvider.insertMmTestData(dbName, tableName); // Get all data before compaction is run - List<String> expectedData = dataProvider.getAllData(dbName, tableName); + List<String> expectedData = dataProvider.getAllData(dbName, tableName, false); Collections.sort(expectedData); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true); @@ -565,7 +569,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest { Assert.assertEquals("Result directories does not match after " + compactionType.name() + " compaction", Collections.singletonList(resultDirName), CompactorTestUtil.getBaseOrDeltaNames(fs, pathFilter, table, null)); - List<String> actualData = dataProvider.getAllData(dbName, tableName); + List<String> actualData = dataProvider.getAllData(dbName, tableName, false); Collections.sort(actualData); Assert.assertEquals(expectedData, actualData); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 848c31f..9888315 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -334,18 +335,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements if (!isMmTable && !isDirectInsert) { if (!bDynParts && !isSkewedStoredAsSubDirectories) { finalPaths[filesIdx] = new Path(parent, taskWithExt); - if (conf.isCompactionTable()) { - // Helper tables used for compaction are external and non-acid. We need to keep - // track of the taskId to avoid overwrites in the case of multiple - // FileSinkOperators, and the file names need to reflect the correct bucketId - // because the files will eventually be placed in an acid table, and the - // OrcFileMergeOperator should not merge data belonging to different buckets. - // Therefore during compaction, data will be stored in the final directory like: - // ${hive_staging_dir}/final_dir/taskid/bucketId - // For example, ${hive_staging dir}/-ext-10002/000000_0/bucket_00000 - finalPaths[filesIdx] = new Path(finalPaths[filesIdx], - AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId)); - } } else { finalPaths[filesIdx] = new Path(buildTmpPath(), taskWithExt); } @@ -505,6 +494,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements String taskId, originalTaskId; protected boolean filesCreated = false; + protected BitSet filesCreatedPerBucket = new BitSet(); private void initializeSpecPath() { // For a query of the type: @@ -753,6 +743,31 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } + /** + * There was an issue with the query-based MINOR compaction (HIVE-23763), that the row distribution between the FileSinkOperators + * was not correlated correctly with the bucket numbers. So it could happen that rows from different buckets ended up in the same + * FileSinkOperator and got written out into one file. This is not correct, one bucket file must contain rows from the same bucket. + * Therefore the FileSinkOperator got extended with this method to be able to handle rows from different buckets. + * In this case it will create separate files from each bucket. This logic is similar to the one in the createDynamicBucket method. + * @param fsp + * @throws HiveException + */ + protected void createBucketFilesForCompaction(FSPaths fsp) throws HiveException { + try { + if (fsp.outPaths.length < bucketId + 1) { + fsp.updaters = Arrays.copyOf(fsp.updaters, bucketId + 1); + fsp.outPaths = Arrays.copyOf(fsp.outPaths, bucketId + 1); + fsp.finalPaths = Arrays.copyOf(fsp.finalPaths, bucketId + 1); + fsp.outWriters = Arrays.copyOf(fsp.outWriters, bucketId + 1); + statsFromRecordWriter = Arrays.copyOf(statsFromRecordWriter, bucketId + 1); + } + createBucketForFileIdx(fsp, bucketId); + } catch (Exception e) { + throw new HiveException(e); + } + filesCreatedPerBucket.set(bucketId); + } + protected void createBucketFiles(FSPaths fsp) throws HiveException { try { int filesIdx = 0; @@ -813,7 +828,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException { try { - fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories); + if (conf.isCompactionTable()) { + fsp.initializeBucketPaths(filesIdx, AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId), + isNativeTable(), isSkewedStoredAsSubDirectories); + } else { + fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories); + } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx] + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path " @@ -962,16 +982,18 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements String lbDirName = null; lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row); - if (!bDynParts && !filesCreated) { + if (!bDynParts && (!filesCreated || conf.isCompactionTable())) { if (lbDirName != null) { if (valToPaths.get(lbDirName) == null) { createNewPaths(null, lbDirName); } - } else { - if (conf.isCompactionTable()) { - int bucketProperty = getBucketProperty(row); - bucketId = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); + } else if (conf.isCompactionTable()) { + int bucketProperty = getBucketProperty(row); + bucketId = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); + if (!filesCreatedPerBucket.get(bucketId)) { + createBucketFilesForCompaction(fsp); } + } else { createBucketFiles(fsp); } } @@ -1063,7 +1085,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements // RecordUpdater expects to get the actual row, not a serialized version of it. Thus we // pass the row rather than recordValue. if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) { - rowOutWriters[findWriterOffset(row)].write(recordValue); + writerOffset = bucketId; + if (!conf.isCompactionTable()) { + writerOffset = findWriterOffset(row); + } + rowOutWriters[writerOffset].write(recordValue); } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) { fpaths.updaters[findWriterOffset(row)].insert(conf.getTableWriteId(), row); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 5321188..efc129f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -453,6 +454,19 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> if (LOG.isTraceEnabled()) { LOG.trace("Going to return hash code " + hashCode); } + if (conf.isCompaction()) { + int bucket; + Object bucketProperty = ((Object[]) row)[2]; + if (bucketProperty == null) { + return hashCode; + } + if (bucketProperty instanceof Writable) { + bucket = ((IntWritable) bucketProperty).get(); + } else { + bucket = (int) bucketProperty; + } + return BucketCodec.determineVersion(bucket).decodeWriterId(bucket); + } return hashCode; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 0d01d5f..4c12927 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4238,26 +4238,9 @@ private void constructOneLBLocationMap(FileStatus fSta, files = new FileStatus[] {src}; } - if (isCompactionTable) { - // Helper tables used for query-based compaction have a special file structure after - // filesink: tmpdir/attemptid/bucketid. - // We don't care about the attemptId anymore and don't want it in the table's final - // structure so just move the bucket files. + if (isCompactionTable && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE)) { try { - List<FileStatus> fileStatuses = new ArrayList<>(); - for (FileStatus file : files) { - if (file.isDirectory() && AcidUtils.originalBucketFilter.accept(file.getPath())) { - FileStatus[] taskDir = srcFs.listStatus(file.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - fileStatuses.addAll(Arrays.asList(taskDir)); - } else { - fileStatuses.add(file); - } - } - files = fileStatuses.toArray(new FileStatus[files.length]); - - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE)) { - AcidUtils.OrcAcidVersion.writeVersionFile(destf, destFs); - } + AcidUtils.OrcAcidVersion.writeVersionFile(destf, destFs); } catch (IOException e) { if (null != pool) { pool.shutdownNow(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java index 65f86d1..55130b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java @@ -189,7 +189,7 @@ final class HiveOpConverterUtils { reduceKeys.size(), numReducers, acidOperation, NullOrdering.defaultNullOrder(hiveConf)); } else { rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, false, tag, - partitionCols, order, nullOrder, NullOrdering.defaultNullOrder(hiveConf), numReducers, acidOperation); + partitionCols, order, nullOrder, NullOrdering.defaultNullOrder(hiveConf), numReducers, acidOperation, false); } ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index a66d23b..8b1df4b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6887,6 +6887,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { boolean multiFileSpray = false; int numFiles = 1; int totalFiles = 1; + boolean isCompaction = false; + if (dest_tab != null && dest_tab.getParameters() != null) { + isCompaction = AcidUtils.isCompactionTable(dest_tab.getParameters()); + } if (dest_tab.getNumBuckets() > 0 && !dest_tab.getBucketCols().isEmpty()) { enforceBucketing = true; @@ -6956,8 +6960,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { nullOrder.append(sortOrder == DirectionUtils.ASCENDING_CODE ? 'a' : 'z'); } input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(), - maxReducers, - acidOp); + maxReducers, acidOp, isCompaction); reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0)); ctx.setMultiFileSpray(multiFileSpray); ctx.setNumFiles(numFiles); @@ -8833,9 +8836,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { acidOp = getAcidType(Utilities.getTableDesc(dest_tab).getOutputFileFormatClass(), dest, AcidUtils.isInsertOnlyTable(dest_tab)); } + boolean isCompaction = false; + if (dest_tab != null && dest_tab.getParameters() != null) { + isCompaction = AcidUtils.isCompactionTable(dest_tab.getParameters()); + } Operator result = genReduceSinkPlan( input, partCols, sortCols, order.toString(), nullOrder.toString(), - numReducers, acidOp, true); + numReducers, acidOp, true, isCompaction); if (result.getParentOperators().size() == 1 && result.getParentOperators().get(0) instanceof ReduceSinkOperator) { ((ReduceSinkOperator) result.getParentOperators().get(0)) @@ -8846,16 +8853,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { private Operator genReduceSinkPlan(Operator<?> input, List<ExprNodeDesc> partitionCols, List<ExprNodeDesc> sortCols, - String sortOrder, String nullOrder, int numReducers, AcidUtils.Operation acidOp) + String sortOrder, String nullOrder, int numReducers, AcidUtils.Operation acidOp, boolean isCompaction) throws SemanticException { return genReduceSinkPlan(input, partitionCols, sortCols, sortOrder, nullOrder, numReducers, - acidOp, false); + acidOp, false, isCompaction); } @SuppressWarnings("nls") private Operator genReduceSinkPlan(Operator<?> input, List<ExprNodeDesc> partitionCols, List<ExprNodeDesc> sortCols, String sortOrder, String nullOrder, int numReducers, AcidUtils.Operation acidOp, - boolean pullConstants) throws SemanticException { + boolean pullConstants, boolean isCompaction) throws SemanticException { RowResolver inputRR = opParseCtx.get(input).getRowResolver(); @@ -8942,7 +8949,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ReduceSinkDesc rsdesc = PlanUtils.getReduceSinkDesc(newSortCols, valueCols, outputColumns, false, -1, partitionCols, newSortOrder.toString(), newNullOrder.toString(), defaultNullOrder, - numReducers, acidOp); + numReducers, acidOp, isCompaction); Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(rsdesc, new RowSchema(rsRR.getColumnInfos()), input), rsRR); @@ -14519,7 +14526,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { */ buildPTFReduceSinkDetails(tabDef, partCols, orderCols, orderString, nullOrderString); input = genReduceSinkPlan(input, partCols, orderCols, orderString.toString(), - nullOrderString.toString(), -1, Operation.NOT_ACID); + nullOrderString.toString(), -1, Operation.NOT_ACID, false); } /* @@ -14622,7 +14629,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } return genReduceSinkPlan(input, partCols, orderCols, order.toString(), nullOrder.toString(), - -1, Operation.NOT_ACID); + -1, Operation.NOT_ACID, false); } public static List<WindowExpressionSpec> parseSelect(String selectExprStr) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 996b2db..d42c3bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -711,7 +711,7 @@ public final class PlanUtils { List<ExprNodeDesc> keyCols, List<ExprNodeDesc> valueCols, List<String> outputColumnNames, boolean includeKeyCols, int tag, List<ExprNodeDesc> partitionCols, String order, String nullOrder, NullOrdering defaultNullOrder, - int numReducers, AcidUtils.Operation writeType) { + int numReducers, AcidUtils.Operation writeType, boolean isCompaction) { ReduceSinkDesc reduceSinkDesc = getReduceSinkDesc(keyCols, keyCols.size(), valueCols, new ArrayList<List<Integer>>(), includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) : @@ -723,6 +723,7 @@ public final class PlanUtils { reduceSinkDesc.setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.FIXED)); reduceSinkDesc.setNumReducers(1); } + reduceSinkDesc.setIsCompaction(isCompaction); return reduceSinkDesc; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 34be0b6..9db01eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -96,6 +96,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { */ private int numBuckets; private List<ExprNodeDesc> bucketCols; + private boolean isCompaction; private int topN = -1; private float topNMemoryUsage = -1; @@ -447,6 +448,14 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { this.numBuckets = numBuckets; } + public boolean isCompaction() { + return isCompaction; + } + + public void setIsCompaction(boolean isCompaction) { + this.isCompaction = isCompaction; + } + @Explain(displayName = "bucketingVersion", explainLevels = { Level.EXTENDED }) public int getBucketingVersionForExplain() { return getBucketingVersion(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 1f732f9..8f6a977 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -115,6 +115,18 @@ abstract class QueryCompactor { } for (String query : compactionQueries) { LOG.info("Running {} compaction via query: {}", compactionInfo.isMajorCompaction() ? "major" : "minor", query); + if (!compactionInfo.isMajorCompaction()) { + // There was an issue with the query-based MINOR compaction (HIVE-23763), that the row distribution between the FileSinkOperators + // was not correlated correctly with the bucket numbers. So we could end up with files containing rows from + // multiple buckets or rows from the same bucket could end up in different FileSinkOperator. This behaviour resulted + // corrupted files. To fix this, the FileSinkOperator has been extended to be able to handle rows from different buckets. + // But we also had to be sure that all rows from the same bucket would end up in the same FileSinkOperator. Therefore + // the ReduceSinkOperator has also been extended to distribute the rows by bucket numbers. To use this logic, + // these two optimisations have to be turned off for the MINOR compaction. The MAJOR compaction works differently + // and its query doesn't use reducers, so these optimisations should not be turned off for MAJOR compaction. + conf.set("hive.optimize.bucketingsorting", "false"); + conf.set("hive.vectorized.execution.enabled", "false"); + } DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId); } commitCompaction(storageDescriptor.getLocation(), tmpTableName, conf, writeIds, compactorTxnId);