This is an automated email from the ASF dual-hosted git repository.
kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 94af0097852 HIVE-29536: ACID Compaction: Improve the rebalance
compaction tests in TestCrudCompactorOnTez (#6487)
94af0097852 is described below
commit 94af00978522f0e1792ed4c63aeff7815b94d7eb
Author: InvisibleProgrammer <[email protected]>
AuthorDate: Mon Jun 22 15:48:32 2026 +0200
HIVE-29536: ACID Compaction: Improve the rebalance compaction tests in
TestCrudCompactorOnTez (#6487)
Co-authored-by: Zsolt Miskolczi <[email protected]>
---
.../hive/ql/txn/compactor/CompactorOnTezTest.java | 41 +-
.../ql/txn/compactor/TestCrudCompactorOnTez.java | 493 +---------------
.../ql/txn/compactor/TestRebalanceCompactor.java | 644 +++++++++++++++++++++
3 files changed, 691 insertions(+), 487 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index c8166d8ee7d..ace6fa5d695 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.Constants;
@@ -48,6 +51,8 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -58,8 +63,8 @@
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static
org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults;
-import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.dropTables;
+import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
/**
* Superclass for Test[Crud|Mm]CompactorOnTez, for setup and helper classes.
@@ -184,7 +189,8 @@ protected void verifySuccessfulCompaction(int
expectedSuccessfulCompactions) thr
protected HiveHookEvents.HiveHookEventProto getRelatedTezEvent(String
dbTableName) throws Exception {
int retryCount = 3;
while (retryCount-- > 0) {
- List<ProtoMessageReader<HiveHookEvents.HiveHookEventProto>> readers =
TestHiveProtoLoggingHook.getTestReader(conf, tmpFolder);
+ List<ProtoMessageReader<HiveHookEvents.HiveHookEventProto>>
+ readers = TestHiveProtoLoggingHook.getTestReader(conf, tmpFolder);
for (ProtoMessageReader<HiveHookEvents.HiveHookEventProto> reader :
readers) {
do {
HiveHookEvents.HiveHookEventProto event;
@@ -541,9 +547,40 @@ protected List<String> getBucketData(String tblName,
String bucketId) throws Exc
"select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " +
bucketId + " order by ROW__ID, a, b", driver);
}
+ protected List<RowInfo> getStructuredBucketData(String tblName, String
bucketId) throws Exception {
+ List<String> getBucketData = getBucketData(tblName, bucketId);
+
+ List<RowInfo> result = new ArrayList<>(getBucketData.size());
+ for (String row : getBucketData) {
+ result.add(RowInfo.fromRawString(row));
+ }
+
+ return result;
+ }
+
protected void dropTable(String tblName) throws Exception {
executeStatementOnDriver("drop table " + tblName, driver);
}
+
+ protected record RowInfo(long writeId, long bucketId, long rowId,
TestRebalanceCompactor.RowData rowData) {
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ static RowInfo fromRawString(String row) throws JsonProcessingException {
+ // Example row data to parse:
"{\"writeid\":7,\"bucketid\":537001984,\"rowid\":10}\t5\t4",
+
+ String[] parts = row.split("\t");
+
+ JsonNode json = MAPPER.readTree(parts[0]);
+
+ return new RowInfo(
+ json.get("writeid").asLong(),
+ json.get("bucketid").asLong(),
+ json.get("rowid").asLong(),
+
+ new TestRebalanceCompactor.RowData(Arrays.copyOfRange(parts, 1,
parts.length))
+ );
+ }
+ }
}
protected Initiator createInitiator() throws Exception {
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 98121f7df01..ec860e90b54 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -25,8 +25,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -54,17 +52,13 @@
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook;
import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.BucketCodec;
-import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
@@ -81,7 +75,6 @@
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
import org.apache.hive.common.util.ReflectionUtil;
import static java.util.Collections.emptyMap;
@@ -90,7 +83,14 @@
import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
import static
org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults;
import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactorBase.dropTables;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.nullable;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
@SuppressWarnings("deprecation")
public class TestCrudCompactorOnTez extends CompactorOnTezTest {
@@ -98,483 +98,6 @@ public class TestCrudCompactorOnTez extends
CompactorOnTezTest {
private static final String DB = "default";
private static final String TABLE1 = "t1";
- @Test
- public void
testRebalanceCompactionWithParallelDeleteAsSecondOptimisticLock() throws
Exception {
- testRebalanceCompactionWithParallelDeleteAsSecond(true);
- }
-
- @Test
- public void
testRebalanceCompactionWithParallelDeleteAsSecondPessimisticLock() throws
Exception {
- testRebalanceCompactionWithParallelDeleteAsSecond(false);
- }
-
- private void testRebalanceCompactionWithParallelDeleteAsSecond(boolean
optimisticLock) throws Exception {
- conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER, false);
- conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, optimisticLock);
-
- //set grouping size to have 3 buckets, and re-create driver with the new
config
- conf.set("tez.grouping.min-size", "400");
- conf.set("tez.grouping.max-size", "5000");
- driver = new Driver(conf);
-
- final String tableName = "rebalance_test";
- TestDataProvider testDataProvider = prepareRebalanceTestData(tableName);
-
- //Try to do a rebalancing compaction
- executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance' ORDER BY b DESC", driver);
-
- CountDownLatch startDelete = new CountDownLatch(1);
- CountDownLatch endDelete = new CountDownLatch(1);
- CompactorFactory factory = Mockito.spy(CompactorFactory.getInstance());
- doAnswer(invocation -> {
- Object result = invocation.callRealMethod();
- startDelete.countDown();
- Thread.sleep(1000);
- return result;
- }).when(factory).getCompactorPipeline(any(), any(), any(), any());
-
- Worker worker = new Worker(factory);
- worker.setConf(conf);
- worker.init(new AtomicBoolean(true));
- worker.start();
-
- if (!startDelete.await(10, TimeUnit.SECONDS)) {
- throw new RuntimeException("Waiting for the compaction to start timed
out!");
- }
-
- boolean aborted = false;
- try {
- executeStatementOnDriver("DELETE FROM " + tableName + " WHERE b = 12",
driver);
- } catch (CommandProcessorException e) {
- if (optimisticLock) {
- Assert.fail("In case of TXN_WRITE_X_LOCK = true, the transaction must
be retried instead of being aborted.");
- }
- aborted = true;
- Assert.assertEquals(LockException.class, e.getCause().getClass());
- Assert.assertEquals( "Transaction manager has aborted the transaction
txnid:19. Reason: Aborting [txnid:19,19] due to a write conflict on
default/rebalance_test committed by [txnid:18,19] d/u", e.getCauseMessage());
- // Delete the record, so the rest of the test can be the same in both
cases
- executeStatementOnDriver("DELETE FROM " + tableName + " WHERE b = 12",
driver);
- } finally {
- if(!optimisticLock && !aborted) {
- Assert.fail("In case of TXN_WRITE_X_LOCK = false, the transaction must
be aborted instead of being retried.");
- }
- }
- endDelete.countDown();
-
- worker.join();
-
- driver.close();
- driver = new Driver(conf);
-
- List<String> result = execSelectAndDumpData("select * from " + tableName +
" WHERE b = 12", driver,
- "Dumping data for " + tableName + " after load:");
- Assert.assertEquals(0, result.size());
-
- //Check if the compaction succeed
- verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
-
- String[][] expectedBuckets = new String[][] {
- {
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t16\t16",
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":2}\t15\t15",
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":3}\t14\t14",
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":4}\t13\t13",
- },
- {
- "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":6}\t6\t4",
- "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":7}\t3\t4",
- "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":8}\t4\t4",
- "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":9}\t2\t4",
- },
- {
- "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":10}\t5\t4",
- "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":11}\t2\t3",
- "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":12}\t3\t3",
- "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":13}\t6\t3",
- "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":14}\t4\t3",
- },
- {
- "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":15}\t5\t3",
- "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":16}\t6\t2",
- "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":17}\t5\t2",
- },
- };
- verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
- new String[] {"bucket_00000", "bucket_00001", "bucket_00002",
"bucket_00003"}, "base_0000007_v0000018");
- }
-
- @Test
- public void
testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTableWithOrder()
throws Exception {
- conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER, false);
-
- //set grouping size to have 3 buckets, and re-create driver with the new
config
- conf.set("tez.grouping.min-size", "400");
- conf.set("tez.grouping.max-size", "5000");
- driver = new Driver(conf);
-
- final String tableName = "rebalance_test";
- TestDataProvider testDataProvider = prepareRebalanceTestData(tableName);
-
- //Try to do a rebalancing compaction
- executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance' ORDER BY b DESC", driver);
- runWorker(conf);
-
- driver.close();
- driver = new Driver(conf);
-
- //Check if the compaction succeed
- verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
-
- String[][] expectedBuckets = new String[][] {
- {
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t16\t16",
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":2}\t15\t15",
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":3}\t14\t14",
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":4}\t13\t13",
- },
- {
- "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":5}\t12\t12",
- "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":6}\t6\t4",
- "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":7}\t3\t4",
- "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":8}\t4\t4",
- "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":9}\t2\t4",
- },
- {
- "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":10}\t5\t4",
- "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":11}\t2\t3",
- "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":12}\t3\t3",
- "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":13}\t6\t3",
- "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":14}\t4\t3",
- },
- {
- "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":15}\t5\t3",
- "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":16}\t6\t2",
- "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":17}\t5\t2",
- },
- };
- verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
- new String[] {"bucket_00000", "bucket_00001",
"bucket_00002","bucket_00003"}, "base_0000007_v0000018");
- }
-
- @Test
- public void testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable()
throws Exception {
- conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER, false);
-
- //set grouping size to have 3 buckets, and re-create driver with the new
config
- conf.set("tez.grouping.min-size", "400");
- conf.set("tez.grouping.max-size", "5000");
- driver = new Driver(conf);
-
- final String tableName = "rebalance_test";
- TestDataProvider testDataProvider = prepareRebalanceTestData(tableName);
-
- //Try to do a rebalancing compaction
- executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
- runWorker(conf);
-
- //Check if the compaction succeed
- verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
-
- String[][] expectedBuckets = new String[][] {
- {
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
- },
- {
-
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":5}\t5\t3",
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4",
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3",
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4",
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3",
- },
- {
- "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":10}\t2\t3",
- "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":11}\t3\t4",
- "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12",
- "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13",
- "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14",
- },
- {
- "{\"writeid\":5,\"bucketid\":537067520,\"rowid\":15}\t15\t15",
- "{\"writeid\":6,\"bucketid\":537067520,\"rowid\":16}\t16\t16",
- "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":17}\t17\t17",
- },
- };
- verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
- new String[] {"bucket_00000", "bucket_00001",
"bucket_00002","bucket_00003"}, "base_0000007_v0000018");
- }
-
- @Test
- public void testRebalanceCompactionOfPartitionedImplicitlyBucketedTable()
throws Exception {
- conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER, false);
-
- //set grouping size to have 3 buckets, and re-create driver with the new
config
- conf.set("tez.grouping.min-size", "1");
- driver = new Driver(conf);
-
- final String stageTableName = "stage_rebalance_test";
- final String tableName = "rebalance_test";
- AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
-
- TestDataProvider testDataProvider = new TestDataProvider();
- testDataProvider.createFullAcidTable(stageTableName, true, false);
- executeStatementOnDriver("insert into " + stageTableName +" values " +
- "('1',1,'yesterday'), ('1',2,'yesterday'), ('1',3, 'yesterday'),
('1',4, 'yesterday'), " +
- "('2',1,'today'), ('2',2,'today'), ('2',3,'today'), ('2',4,
'today'), " +
- "('3',1,'tomorrow'), ('3',2,'tomorrow'), ('3',3,'tomorrow'),
('3',4,'tomorrow')",
- driver);
-
- dropTables(driver, tableName);
- executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
- "PARTITIONED BY (ds string) STORED AS ORC
TBLPROPERTIES('transactional'='true')", driver);
- executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + "
partition (ds='tomorrow') select a, b from " + stageTableName, driver);
-
- //do some single inserts to have more data in the first bucket.
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12,'tomorrow')", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13,'tomorrow')", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14,'tomorrow')", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15,'tomorrow')", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16,'tomorrow')", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17,'tomorrow')", driver);
-
- // Verify buckets and their content before rebalance in partition
ds=tomorrow
- Table table = msClient.getTable("default", tableName);
- FileSystem fs = FileSystem.get(conf);
- Assert.assertEquals("Test setup does not match the expected: different
buckets",
- Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
- CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow",
"base_0000001"));
- String[][] expectedBuckets = new String[][] {
- {
-
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow",
-
"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12\ttomorrow",
-
"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13\ttomorrow",
-
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14\ttomorrow",
-
"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15\ttomorrow",
-
"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16\ttomorrow",
-
"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17\ttomorrow",
- },
- {
-
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t1\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t2\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t3\t3\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t3\t4\ttomorrow",
- },
- {
-
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t1\t1\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t1\t2\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":2}\t1\t3\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":3}\t1\t4\ttomorrow",
- },
- };
- for(int i = 0; i < 3; i++) {
- Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
- testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
- }
-
- //Try to do a rebalancing compaction
- executeStatementOnDriver("ALTER TABLE " + tableName + " PARTITION
(ds='tomorrow') COMPACT 'rebalance'", driver);
- runWorker(conf);
-
- //Check if the compaction succeed
- verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
-
- expectedBuckets = new String[][] {
- {
-
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t3\t1\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t3\t2\ttomorrow",
- },
- {
-
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t3\t3\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t4\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t1\t1\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t1\t2\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t1\t3\ttomorrow",
-
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t1\t4\ttomorrow",
- },
- {
-
"{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12\ttomorrow",
-
"{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13\ttomorrow",
-
"{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14\ttomorrow",
-
"{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15\ttomorrow",
-
"{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16\ttomorrow",
-
"{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17\ttomorrow",
- },
- };
- verifyRebalance(testDataProvider, tableName, "ds=tomorrow",
expectedBuckets,
- new String[] {"bucket_00000", "bucket_00001", "bucket_00002"},
"base_0000007_v0000014");
- }
-
- @Test
- public void testRebalanceCompactionOfNotPartitionedExplicitlyBucketedTable()
throws Exception {
- conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER, false);
-
- final String tableName = "rebalance_test";
- dropTables(driver, tableName);
- executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
- "CLUSTERED BY(a) INTO 4 BUCKETS STORED AS ORC
TBLPROPERTIES('transactional'='true')", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('11',11),('22',22),('33',33),('44',44)", driver);
-
- //Try to do a rebalancing compaction
- executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
- runWorker(conf);
-
- //Check if the compaction is refused
- List<ShowCompactResponseElement> compacts = verifyCompaction(1,
TxnStore.REFUSED_RESPONSE);
- Assert.assertEquals("Expecting error message 'Cannot execute rebalancing
compaction on bucketed tables.' and found:" + compacts.get(0).getState(),
- "Cannot execute rebalancing compaction on bucketed tables.",
compacts.get(0).getErrorMessage());
- }
-
- @Test
- public void testRebalanceCompactionNotPartitionedExplicitBucketNumbers()
throws Exception {
- conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER, false);
-
- //set grouping size to have 3 buckets, and re-create driver with the new
config
- conf.set("tez.grouping.min-size", "400");
- conf.set("tez.grouping.max-size", "5000");
- driver = new Driver(conf);
-
- final String tableName = "rebalance_test";
- TestDataProvider testDataProvider = prepareRebalanceTestData(tableName);
-
- //Try to do a rebalancing compaction
- executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance' CLUSTERED INTO 4 BUCKETS", driver);
- runWorker(conf);
-
- verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
-
- String[][] expectedBuckets = new String[][] {
- {
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
- },
- {
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":5}\t5\t3",
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4",
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3",
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4",
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3",
- },
- {
- "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":10}\t2\t3",
- "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":11}\t3\t4",
- "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12",
- "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13",
- "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14",
- },
- {
- "{\"writeid\":5,\"bucketid\":537067520,\"rowid\":15}\t15\t15",
- "{\"writeid\":6,\"bucketid\":537067520,\"rowid\":16}\t16\t16",
- "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":17}\t17\t17",
- },
- };
- verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
- new String[] {"bucket_00000", "bucket_00001", "bucket_00002",
"bucket_00003"}, "base_0000007_v0000018");
- }
-
- private TestDataProvider prepareRebalanceTestData(String tableName) throws
Exception {
- final String stageTableName = "stage_" + tableName;
-
- TestDataProvider testDataProvider = new TestDataProvider();
- testDataProvider.createFullAcidTable(stageTableName, true, false);
- testDataProvider.insertTestData(stageTableName, true);
-
- dropTables(driver, tableName);
- executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
- "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
- executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select
a, b from " + stageTableName, driver);
-
- //do some single inserts to have more data in the first bucket.
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12)", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13)", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14)", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15)", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16)", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17)", driver);
-
- // Verify buckets and their content before rebalance
- Table table = msClient.getTable("default", tableName);
- FileSystem fs = FileSystem.get(conf);
- Assert.assertEquals("Test setup does not match the expected: different
buckets",
- Arrays.asList("bucket_00000_0", "bucket_00001_0",
"bucket_00002_0","bucket_00003_0"),
- CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
- String[][] expectedBuckets = new String[][] {
- {
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
- "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
- "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
- "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
- "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
- "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
- "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
- "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
- },
- {
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t5\t2",
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t5\t3",
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t2\t4",
- },
- {
- "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t3\t3",
- "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t4\t4",
- "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":2}\t4\t3",
- },
- {
- "{\"writeid\":1,\"bucketid\":537067520,\"rowid\":0}\t2\t3",
- "{\"writeid\":1,\"bucketid\":537067520,\"rowid\":1}\t3\t4",
- },
- };
- AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
- for(int i = 0; i < 3; i++) {
- Assert.assertEquals("unbalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
- testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
- }
- return testDataProvider;
- }
-
- private void verifyRebalance(TestDataProvider testDataProvider, String
tableName, String partitionName,
- String[][] expectedBucketContent, String[]
bucketNames, String folderName) throws Exception {
- // Verify buckets and their content after rebalance
- Table table = msClient.getTable("default", tableName);
- FileSystem fs = FileSystem.get(conf);
- Assert.assertEquals("Buckets does not match after compaction",
Arrays.asList(bucketNames),
- CompactorTestUtil.getBucketFileNames(fs, table, partitionName,
folderName));
- AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
- for (int i = 0; i < expectedBucketContent.length; i++) {
- Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBucketContent[i]),
- testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
- }
- }
-
@Test
public void testCompactionShouldNotFailOnPartitionsWithBooleanField() throws
Exception {
conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestRebalanceCompactor.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestRebalanceCompactor.java
new file mode 100644
index 00000000000..f85edf030fc
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestRebalanceCompactor.java
@@ -0,0 +1,644 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.hive.ql.ErrorMsg.TXN_ABORTED;
+import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runWorker;
+import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactorBase.dropTables;
+import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactorBase.execSelectAndDumpData;
+import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactorBase.executeStatementOnDriver;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+
+public class TestRebalanceCompactor extends CompactorOnTezTest {
+
+ @Test
+ public void
testRebalanceCompactionWithParallelDeleteAsSecondOptimisticLock() throws
Exception {
+ testRebalanceCompactionWithParallelDeleteAsSecond(true);
+ }
+
+ @Test
+ public void
testRebalanceCompactionWithParallelDeleteAsSecondPessimisticLock() throws
Exception {
+ testRebalanceCompactionWithParallelDeleteAsSecond(false);
+ }
+
+ @Test
+ public void
testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTableWithOrder()
throws Exception {
+ prepareHiveConfForRebalanceCompaction();
+
+ conf.set("tez.grouping.min-size", "400");
+ conf.set("tez.grouping.max-size", "5000");
+ driver = new Driver(conf);
+
+ final String tableName = "rebalance_test";
+ TestDataProvider testDataProvider = prepareRebalanceTestData();
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance' ORDER BY b DESC", driver);
+ runWorker(conf);
+
+ driver.close();
+ driver = new Driver(conf);
+
+ //Check if the compaction succeed
+ verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
+
+ // Populate expected data
+ List<RowData> expectedData = new ArrayList<>();
+
+ expectedData.addAll(List.of(
+ new RowData("17", "17"),
+ new RowData("16", "16"),
+ new RowData("15", "15"),
+ new RowData("14", "14"),
+ new RowData("13", "13"),
+ new RowData("12", "12")
+ ));
+
+ // Adding the '4' group
+ expectedData.addAll(List.of(
+ new RowData("6", "4"),
+ new RowData("3", "4"),
+ new RowData("4", "4"),
+ new RowData("2", "4"),
+ new RowData("5", "4")
+ ));
+
+ // Adding the '3' group
+ expectedData.addAll(List.of(
+ new RowData("2", "3"),
+ new RowData("3", "3"),
+ new RowData("6", "3"),
+ new RowData("4", "3"),
+ new RowData("5", "3")
+ ));
+
+ // Adding the '2' group
+ expectedData.addAll(List.of(
+ new RowData("6", "2"),
+ new RowData("5", "2")
+ ));
+
+ verifyDataAfterCompaction(expectedData, testDataProvider);
+ }
+
+ private void prepareHiveConfForRebalanceCompaction() {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER, false);
+ }
+
+ @Test
+ public void testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable()
throws Exception {
+ prepareHiveConfForRebalanceCompaction();
+
+ // set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "400");
+ conf.set("tez.grouping.max-size", "5000");
+ driver = new Driver(conf);
+
+ final String tableName = "rebalance_test";
+ TestDataProvider testDataProvider = prepareRebalanceTestData();
+
+ // Run rebalance compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
+ runWorker(conf);
+
+ // Check if the compaction succeed
+ verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
+
+ // Populate expected data
+ List<RowData> expectedData = new ArrayList<>();
+
+ expectedData.addAll(List.of(
+ new RowData("5", "4"),
+ new RowData("6", "2"),
+ new RowData("6", "3"),
+ new RowData("6", "4"),
+ new RowData("5", "2")
+ ));
+
+ expectedData.addAll(List.of(
+ new RowData("5", "3"),
+ new RowData("2", "4"),
+ new RowData("3", "3"),
+ new RowData("4", "4"),
+ new RowData("4", "3")
+ ));
+
+ expectedData.addAll(List.of(
+ new RowData("2", "3"),
+ new RowData("3", "4"),
+ new RowData("12", "12"),
+ new RowData("13", "13"),
+ new RowData("14", "14")
+ ));
+
+ expectedData.addAll(List.of(
+ new RowData("15", "15"),
+ new RowData("16", "16"),
+ new RowData("17", "17")
+ ));
+
+ verifyDataAfterCompaction(expectedData, testDataProvider, null, false);
+ }
+
+ @Test
+ public void testRebalanceCompactionOfPartitionedImplicitlyBucketedTable()
throws Exception {
+ prepareHiveConfForRebalanceCompaction();
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ executeStatementOnDriver("insert into " + stageTableName +" values " +
+ "('1',1,'yesterday'), ('1',2,'yesterday'), ('1',3, 'yesterday'),
('1',4, 'yesterday'), " +
+ "('2',1,'today'), ('2',2,'today'), ('2',3,'today'), ('2',4,
'today'), " +
+ "('3',1,'tomorrow'), ('3',2,'tomorrow'), ('3',3,'tomorrow'),
('3',4,'tomorrow')",
+ driver);
+
+ dropTables(driver, tableName);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "PARTITIONED BY (ds string) STORED AS ORC
TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver(
+ "INSERT OVERWRITE TABLE " + tableName + " partition (ds='tomorrow')
select a, b from " + stageTableName, driver
+ );
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17,'tomorrow')", driver);
+
+ // Verify buckets and their content before rebalance in partition
ds=tomorrow
+ // Make sure we have all the records persisted
+ List<String> allRecords = execSelectAndDumpData(
+ "SELECT * FROM " + tableName, driver, "Dumping data from test table, "
+ tableName);
+ Assert.assertEquals(18, allRecords.size());
+
+ Assert.assertFalse(isBalanced(testDataProvider, "ds=tomorrow"));
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " PARTITION
(ds='tomorrow') COMPACT 'rebalance'", driver);
+ runWorker(conf);
+
+ //Check if the compaction succeed
+ verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
+
+ List<RowData> expectedData = new ArrayList<>();
+
+ expectedData.addAll(List.of(
+ new RowData("2", "1", "tomorrow"),
+ new RowData("2", "2", "tomorrow"),
+ new RowData("2", "3", "tomorrow"),
+ new RowData("2", "4", "tomorrow"),
+ new RowData("3", "1", "tomorrow"),
+ new RowData("3", "2", "tomorrow")
+ ));
+
+ expectedData.addAll(List.of(
+ new RowData("3", "3", "tomorrow"),
+ new RowData("3", "4", "tomorrow"),
+ new RowData("1", "1", "tomorrow"),
+ new RowData("1", "2", "tomorrow"),
+ new RowData("1", "3", "tomorrow"),
+ new RowData("1", "4", "tomorrow")
+ ));
+
+ expectedData.addAll(List.of(
+ new RowData("12", "12", "tomorrow"),
+ new RowData("13", "13", "tomorrow"),
+ new RowData("14", "14", "tomorrow"),
+ new RowData("15", "15", "tomorrow"),
+ new RowData("16", "16", "tomorrow"),
+ new RowData("17", "17", "tomorrow")
+ ));
+
+ verifyDataAfterCompaction(expectedData, testDataProvider, "ds=tomorrow",
false);
+ }
+
+ @Test
+ public void testRebalanceCompactionOfNotPartitionedExplicitlyBucketedTable()
throws Exception {
+ prepareHiveConfForRebalanceCompaction();
+
+ final String tableName = "rebalance_test";
+ dropTables(driver, tableName);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "CLUSTERED BY(a) INTO 4 BUCKETS STORED AS ORC
TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver(
+ "INSERT INTO TABLE " + tableName + " values
('11',11),('22',22),('33',33),('44',44)", driver
+ );
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
+ runWorker(conf);
+
+ //Check if the compaction is refused
+ List<ShowCompactResponseElement> compacts = verifyCompaction(1,
TxnStore.REFUSED_RESPONSE);
+ assertEquals(
+ "Expecting error message 'Cannot execute rebalancing compaction on
bucketed tables.' and found:" +
+ compacts.getFirst().getState(),
+ "Cannot execute rebalancing compaction on bucketed tables.",
compacts.getFirst().getErrorMessage());
+ }
+
+ @Test
+ public void testRebalanceCompactionNotPartitionedExplicitBucketNumbers()
throws Exception {
+ prepareHiveConfForRebalanceCompaction();
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "400");
+ conf.set("tez.grouping.max-size", "5000");
+ driver = new Driver(conf);
+
+ final String tableName = "rebalance_test";
+ TestDataProvider testDataProvider = prepareRebalanceTestData();
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance' CLUSTERED INTO 4 BUCKETS", driver);
+ runWorker(conf);
+
+ verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
+
+ List<RowData> expectedData = new ArrayList<>();
+ expectedData.addAll(List.of(
+ new RowData("5", "4"),
+ new RowData("6", "2"),
+ new RowData("6", "3"),
+ new RowData("6", "4"),
+ new RowData("5", "2")
+ ));
+
+ expectedData.addAll(List.of(
+ new RowData("5", "3"),
+ new RowData("2", "4"),
+ new RowData("3", "3"),
+ new RowData("4", "4"),
+ new RowData("4", "3")
+ ));
+
+ expectedData.addAll(List.of(
+ new RowData("2", "3"),
+ new RowData("3", "4"),
+ new RowData("12", "12"),
+ new RowData("13", "13"),
+ new RowData("14", "14")
+ ));
+
+ expectedData.addAll(List.of(
+ new RowData("15", "15"),
+ new RowData("16", "16"),
+ new RowData("17", "17")
+ ));
+
+ verifyDataAfterCompaction(expectedData, testDataProvider, null, false);
+ }
+
+ @SuppressWarnings("java:S2925")
+ private void testRebalanceCompactionWithParallelDeleteAsSecond(boolean
optimisticLock) throws Exception {
+ prepareHiveConfForRebalanceCompaction();
+ conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, optimisticLock);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "400");
+ conf.set("tez.grouping.max-size", "5000");
+ driver = new Driver(conf);
+
+ final String tableName = "rebalance_test";
+ TestDataProvider testDataProvider = prepareRebalanceTestData();
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance' ORDER BY b DESC", driver);
+
+ CountDownLatch startDelete = new CountDownLatch(1);
+ CountDownLatch endDelete = new CountDownLatch(1);
+ CompactorFactory factory = Mockito.spy(CompactorFactory.getInstance());
+ doAnswer(invocation -> {
+ Object result = invocation.callRealMethod();
+ startDelete.countDown();
+ Thread.sleep(1000);
+ return result;
+ }).when(factory).getCompactorPipeline(any(), any(), any(), any());
+
+ Worker worker = new Worker(factory);
+ worker.setConf(conf);
+ worker.init(new AtomicBoolean(true));
+ worker.start();
+
+ if (!startDelete.await(10, TimeUnit.SECONDS)) {
+ throw new RuntimeException("Waiting for the compaction to start timed
out!");
+ }
+
+ boolean aborted = false;
+ try {
+ executeStatementOnDriver("DELETE FROM " + tableName + " WHERE b = 12",
driver);
+ } catch (CommandProcessorException e) {
+ if (optimisticLock) {
+ Assert.fail("In case of TXN_WRITE_X_LOCK = true, the transaction must
be retried instead of being aborted.");
+ }
+ aborted = true;
+ Assert.assertEquals(12, e.getResponseCode());
+ Assert.assertEquals(TXN_ABORTED.getErrorCode(), e.getErrorCode());
+
+ // Delete the record, so the rest of the test can be the same in both
cases
+ executeStatementOnDriver("DELETE FROM " + tableName + " WHERE b = 12",
driver);
+ } finally {
+ if(!optimisticLock && !aborted) {
+ Assert.fail("In case of TXN_WRITE_X_LOCK = false, the transaction must
be aborted instead of being retried.");
+ }
+ }
+ endDelete.countDown();
+
+ worker.join();
+
+ driver.close();
+ driver = new Driver(conf);
+
+ List<String> result = execSelectAndDumpData("select * from " + tableName +
" WHERE b = 12", driver,
+ "Dumping data for " + tableName + " after load:");
+ assertEquals(0, result.size());
+
+ //Check if the compaction succeed
+ verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
+
+ // Populate expected data
+ List<RowData> expectedData = new ArrayList<>();
+
+ expectedData.addAll(List.of(
+ new RowData("17", "17"),
+ new RowData("16", "16"),
+ new RowData("15", "15"),
+ new RowData("14", "14"),
+ new RowData("13", "13")
+ ));
+
+ // Adding the '4' group
+ expectedData.addAll(List.of(
+ new RowData("6", "4"),
+ new RowData("3", "4"),
+ new RowData("4", "4"),
+ new RowData("2", "4"),
+ new RowData("5", "4")
+ ));
+
+ // Adding the '3' group
+ expectedData.addAll(List.of(
+ new RowData("2", "3"),
+ new RowData("3", "3"),
+ new RowData("6", "3"),
+ new RowData("4", "3"),
+ new RowData("5", "3")
+ ));
+
+ // Adding the '2' group
+ expectedData.addAll(List.of(
+ new RowData("6", "2"),
+ new RowData("5", "2")
+ ));
+
+ verifyDataAfterCompaction(expectedData, testDataProvider);
+ }
+
+ record RowData(String... columns) {
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof RowData(String[] otherColumns)) {
+ return Arrays.equals(otherColumns, this.columns);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(columns);
+ }
+ }
+
+ /**
+ * Validate the data after rebalance compaction.
+ * - the table is balanced (or if not, only numberOfDeletedRows amount of
rows are missing
+ * - there is only one writeId
+ * - buckets has unique bucketId and the bucketId doesn't change inside a
bucket
+ * - all the required value present
+ * - rowId must be strictly monotonic
+ *
+ * @param expectedData Expected row data
+ * @param testDataProvider Test data provider
+ * @throws Exception Any exception that occurs during the execution
+ */
+ private void verifyDataAfterCompaction(List<RowData> expectedData,
TestDataProvider testDataProvider)
+ throws Exception {
+ verifyDataAfterCompaction(expectedData, testDataProvider, (String) null,
true);
+ }
+ /**
+ * Validate the data after rebalance compaction.
+ * - the table is balanced (or if not, only numberOfDeletedRows amount of
rows are missing
+ * - writeId must be strictly monotonic
+ * - buckets has unique bucketId and the bucketId doesn't change inside a
bucket
+ * - if we expect the output sorted, data is sorted by column b (so the
order of column a is not predictable)
+ * - all the required value present
+ * - rowId must be strictly monotonic
+ *
+ * @param expectedData Expected row data
+ * @param testDataProvider Test data provider
+ * @param sorted True if the data must be sorted
+ * @throws Exception Any exception that occurs during the execution
+ */
+ private void verifyDataAfterCompaction(
+ List<RowData> expectedData, TestDataProvider testDataProvider, String
partition, boolean sorted
+ ) throws Exception {
+
+ FileSystem fs = FileSystem.get(conf);
+ GetTableRequest getTableRequest = new GetTableRequest("default",
"rebalance_test");
+ Table table = msClient.getTable(getTableRequest);
+ List<String> bucketFilenames = CompactorTestUtil.getBucketFileNames(
+ fs, table, partition, "base_0000001");
+
+ int bucketCount = bucketFilenames.size();
+ assertTrue(bucketCount > 0);
+
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ int upperBound = (expectedData.size() + bucketCount - 1) / bucketCount;
+
+ long previousValueForColB = Long.MAX_VALUE;
+ long previousRowId = Long.MIN_VALUE;
+
+ for (int i = 0; i < bucketCount; i++) {
+ List<TestDataProvider.RowInfo> bucket =
+ testDataProvider.getStructuredBucketData(
+ table.getTableName(), BucketCodec.V1.encode(options.bucket(i)) +
""
+ );
+
+ int bucketSize = bucket.size();
+ assertTrue(bucketSize <= upperBound);
+
+ long bucketId = -1L;
+
+ long previousWriteId = -1L;
+
+ for (TestDataProvider.RowInfo rowInfo : bucket) {
+
+ // RowId must be strictly monotonic
+ assertTrue(
+ String.format(
+ "RowId must be strictly monotonic rule failed. Previous RowId:
%d, Bucket: %s, ",
+ previousRowId, rowInfo),
+ rowInfo.rowId() > previousRowId);
+ previousRowId = rowInfo.rowId();
+
+ // Check if writeId is strictly monotonic
+ if (previousWriteId == -1L) {
+ // we are at the first element
+ previousWriteId = rowInfo.writeId();
+ } else {
+ assertTrue(previousWriteId <= rowInfo.writeId());
+ previousWriteId = rowInfo.writeId();
+ }
+
+ // Check if bucketId doesn't change inside the bucket
+ if (bucketId == -1) {
+ // we are at the first element of the bucket
+ bucketId = rowInfo.bucketId();
+ } else {
+ assertEquals(bucketId, rowInfo.bucketId());
+ }
+
+ // Check if all the necessary data persist
+ RowData rowData = rowInfo.rowData();
+ assertTrue(expectedData.contains(rowData));
+ expectedData.remove(rowData);
+
+ // Check if the data is sorted by colB desc
+ if (sorted) {
+ long colB = Long.parseLong(rowData.columns()[1]);
+ assertTrue(colB <= previousValueForColB);
+ previousValueForColB = colB;
+ }
+ }
+ }
+
+ // check if we got all the expected values
+ assertEquals(0, expectedData.size()); // we have found all the elements in
a proper order
+ }
+
+ private TestDataProvider prepareRebalanceTestData() throws Exception {
+ final String stageTableName = "stage_" + "rebalance_test";
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ testDataProvider.insertTestData(stageTableName, true);
+
+ dropTables(driver, "rebalance_test");
+ executeStatementOnDriver("CREATE TABLE " + "rebalance_test" + "(a string,
b int) " +
+ "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver(
+ "INSERT OVERWRITE TABLE " + "rebalance_test" + " select a, b from " +
stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + "rebalance_test" + "
values ('12',12)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + "rebalance_test" + "
values ('13',13)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + "rebalance_test" + "
values ('14',14)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + "rebalance_test" + "
values ('15',15)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + "rebalance_test" + "
values ('16',16)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + "rebalance_test" + "
values ('17',17)", driver);
+
+ // Make sure we have all the records persisted
+ List<String> allRecords = execSelectAndDumpData(
+ "SELECT * FROM " + "rebalance_test", driver, "Dumping data from test
table, " + "rebalance_test");
+ Assert.assertEquals(18, allRecords.size());
+
+ Assert.assertFalse(isBalanced(testDataProvider, null));
+
+ // Please note, as the test tests rebalance compaction, not insert
overwrite, it is not necessary to test if
+ // we have the exact same data after preparing the test data as we had at
the source table.
+ return testDataProvider;
+ }
+
+ /**
+ * checks if the test data is unbalanced
+ * Balanced if all the buckets contain between n / bucket count and n /
bucket count + bucket count rows,
+ * where n is the number of rows in the table.
+ * In our test case, we inserted 6 extra rows into the first bucket so, we
can say it is properly unbalanced
+ * if the first bucket has 6 more elements than the second one.
+ **/
+ private boolean isBalanced(TestDataProvider testDataProvider, String
partition) throws Exception {
+ FileSystem fs = FileSystem.get(conf);
+ GetTableRequest getTableRequest = new GetTableRequest("default",
"rebalance_test");
+ Table table = msClient.getTable(getTableRequest);
+
+ // Assert that we have multiple buckets
+ List<String> bucketFilenames = CompactorTestUtil.getBucketFileNames(fs,
table, partition, "base_0000001");
+ assertTrue(bucketFilenames.size() > 1);
+
+ int bucketCount = bucketFilenames.size();
+
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+ List<String>[] bucketData = new ArrayList[bucketCount];
+ for (int i = 0; i < bucketCount; i++) {
+ bucketData[i] = testDataProvider.getBucketData(
+ table.getTableName(), BucketCodec.V1.encode(options.bucket(i)) + "");
+ }
+
+ int allRecordCount = Arrays.stream(bucketData)
+ .map(Collection::size)
+ .reduce(0, Integer::sum);
+
+ int lowerBound = allRecordCount / bucketCount;
+ int upperBound = (allRecordCount + bucketCount - 1) / bucketCount;
+
+ for (int i = 0; i < bucketCount; i++) {
+ if (bucketData[i].size() > upperBound || bucketData[i].size() <
lowerBound) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}