This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new e75a048 HBASE-24428 : Update compaction priority for recently split daughter regions (#1784) e75a048 is described below commit e75a0481f58cfbf9156d779e5c21986095953e09 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Wed May 27 20:56:41 2020 +0530 HBASE-24428 : Update compaction priority for recently split daughter regions (#1784) Signed-off-by: Andrew Purtell <apurt...@apache.org> --- .../apache/hadoop/hbase/regionserver/HStore.java | 23 +++++- .../hadoop/hbase/regionserver/StoreUtils.java | 2 +- .../compactions/CompactionRequestImpl.java | 13 +++ .../compactions/SortedCompactionPolicy.java | 1 + .../compactions/StripeCompactionPolicy.java | 1 + .../TestSplitTransactionOnCluster.java | 94 ++++++++++++++++++++-- 6 files changed, 126 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 496b699..6687628 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -143,6 +143,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16; + // HBASE-24428 : Update compaction priority for recently split daughter regions + // so as to prioritize their compaction. + // Any compaction candidate with higher priority than compaction of newly split daugher regions + // should have priority value < (Integer.MIN_VALUE + 1000) + private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000; + private static final Logger LOG = LoggerFactory.getLogger(HStore.class); protected final MemStore memstore; @@ -1937,7 +1943,22 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, // Set common request properties. // Set priority, either override value supplied by caller or from store. - request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority()); + final int compactionPriority = + (priority != Store.NO_PRIORITY) ? priority : getCompactPriority(); + request.setPriority(compactionPriority); + + if (request.isAfterSplit()) { + // If the store belongs to recently splitted daughter regions, better we consider + // them with the higher priority in the compaction queue. + // Override priority if it is lower (higher int value) than + // SPLIT_REGION_COMPACTION_PRIORITY + final int splitHousekeepingPriority = + Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY); + request.setPriority(splitHousekeepingPriority); + LOG.info("Keeping/Overriding Compaction request priority to {} for CF {} since it" + + " belongs to recently split daughter region {}", splitHousekeepingPriority, + this.getColumnFamilyName(), getRegionInfo().getRegionNameAsString()); + } request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); request.setTracker(tracker); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java index dd17729..0e4f6c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java @@ -52,7 +52,7 @@ public class StoreUtils { */ public static boolean hasReferences(Collection<HStoreFile> files) { // TODO: make sure that we won't pass null here in the future. - return files != null ? files.stream().anyMatch(HStoreFile::isReference) : false; + return files != null && files.stream().anyMatch(HStoreFile::isReference); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java index 4ea8e3f..899219d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java @@ -43,6 +43,7 @@ public class CompactionRequestImpl implements CompactionRequest { private DisplayCompactionType isMajor = DisplayCompactionType.MINOR; private int priority = NO_PRIORITY; private Collection<HStoreFile> filesToCompact; + private boolean isAfterSplit = false; // CompactRequest object creation time. private long selectionTime; @@ -136,6 +137,14 @@ public class CompactionRequestImpl implements CompactionRequest { return tracker; } + public boolean isAfterSplit() { + return isAfterSplit; + } + + public void setAfterSplit(boolean afterSplit) { + isAfterSplit = afterSplit; + } + @Override public int hashCode() { final int prime = 31; @@ -149,6 +158,7 @@ public class CompactionRequestImpl implements CompactionRequest { result = prime * result + ((storeName == null) ? 0 : storeName.hashCode()); result = prime * result + (int) (totalSize ^ (totalSize >>> 32)); result = prime * result + ((tracker == null) ? 0 : tracker.hashCode()); + result = prime * result + (isAfterSplit ? 1231 : 1237); return result; } @@ -200,6 +210,9 @@ public class CompactionRequestImpl implements CompactionRequest { if (totalSize != other.totalSize) { return false; } + if (isAfterSplit != other.isAfterSplit) { + return false; + } if (tracker == null) { if (other.tracker != null) { return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java index 9b30ab5..ef9f3ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java @@ -84,6 +84,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { CompactionRequestImpl result = createCompactionRequest(candidateSelection, isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck); + result.setAfterSplit(isAfterSplit); ArrayList<HStoreFile> filesToCompact = Lists.newArrayList(result.getFiles()); removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 756faa9..443075c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -122,6 +122,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { SplitStripeCompactionRequest request = new SplitStripeCompactionRequest( allFiles, OPEN_KEY, OPEN_KEY, targetKvs); request.setMajorRangeFull(); + request.getRequest().setAfterSplit(true); return request; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 45f35ef..0ffd607 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; @@ -26,6 +27,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -74,6 +77,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.MasterRpcServices; @@ -85,6 +89,7 @@ import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -108,6 +113,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -278,6 +284,79 @@ public class TestSplitTransactionOnCluster { assertEquals(2, cluster.getRegions(tableName).size()); } + @Test + public void testSplitCompactWithPriority() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + // Create table then get the single region for our new table. + byte[] cf = Bytes.toBytes("cf"); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)).build(); + admin.createTable(htd); + + assertNotEquals("Unable to retrieve regions of the table", -1, + TESTING_UTIL.waitFor(10000, () -> cluster.getRegions(tableName).size() == 1)); + + HRegion region = cluster.getRegions(tableName).get(0); + HStore store = region.getStore(cf); + int regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName()); + HRegionServer regionServer = cluster.getRegionServer(regionServerIndex); + + Table table = TESTING_UTIL.getConnection().getTable(tableName); + // insert data + insertData(tableName, admin, table); + insertData(tableName, admin, table, 20); + insertData(tableName, admin, table, 40); + + // Compaction Request + store.triggerMajorCompaction(); + Optional<CompactionContext> compactionContext = store.requestCompaction(); + assertTrue(compactionContext.isPresent()); + assertFalse(compactionContext.get().getRequest().isAfterSplit()); + assertEquals(compactionContext.get().getRequest().getPriority(), 13); + + // Split + long procId = + cluster.getMaster().splitRegion(region.getRegionInfo(), Bytes.toBytes("row4"), 0, 0); + + // wait for the split to complete or get interrupted. If the split completes successfully, + // the procedure will return true; if the split fails, the procedure would throw exception. + ProcedureTestingUtility.waitProcedure(cluster.getMaster().getMasterProcedureExecutor(), + procId); + + assertEquals(2, cluster.getRegions(tableName).size()); + // we have 2 daughter regions + HRegion hRegion1 = cluster.getRegions(tableName).get(0); + HRegion hRegion2 = cluster.getRegions(tableName).get(1); + HStore hStore1 = hRegion1.getStore(cf); + HStore hStore2 = hRegion2.getStore(cf); + + // For hStore1 && hStore2, set mock reference to one of the storeFiles + StoreFileInfo storeFileInfo1 = new ArrayList<>(hStore1.getStorefiles()).get(0).getFileInfo(); + StoreFileInfo storeFileInfo2 = new ArrayList<>(hStore2.getStorefiles()).get(0).getFileInfo(); + Field field = StoreFileInfo.class.getDeclaredField("reference"); + field.setAccessible(true); + field.set(storeFileInfo1, Mockito.mock(Reference.class)); + field.set(storeFileInfo2, Mockito.mock(Reference.class)); + hStore1.triggerMajorCompaction(); + hStore2.triggerMajorCompaction(); + + compactionContext = hStore1.requestCompaction(); + assertTrue(compactionContext.isPresent()); + // since we set mock reference to one of the storeFiles, we will get isAfterSplit=true && + // highest priority for hStore1's compactionContext + assertTrue(compactionContext.get().getRequest().isAfterSplit()); + assertEquals(compactionContext.get().getRequest().getPriority(), Integer.MIN_VALUE + 1000); + + compactionContext = + hStore2.requestCompaction(Integer.MIN_VALUE + 10, CompactionLifeCycleTracker.DUMMY, null); + assertTrue(compactionContext.isPresent()); + // compaction request contains higher priority than default priority of daughter region + // compaction (Integer.MIN_VALUE + 1000), hence we are expecting request priority to + // be accepted. + assertTrue(compactionContext.get().getRequest().isAfterSplit()); + assertEquals(compactionContext.get().getRequest().getPriority(), Integer.MIN_VALUE + 10); + } + public static class FailingSplitMasterObserver implements MasterCoprocessor, MasterObserver { volatile CountDownLatch latch; @@ -637,18 +716,21 @@ public class TestSplitTransactionOnCluster { } } - private void insertData(final TableName tableName, Admin admin, Table t) throws IOException, - InterruptedException { - Put p = new Put(Bytes.toBytes("row1")); + private void insertData(final TableName tableName, Admin admin, Table t) throws IOException { + insertData(tableName, admin, t, 1); + } + + private void insertData(TableName tableName, Admin admin, Table t, int i) throws IOException { + Put p = new Put(Bytes.toBytes("row" + i)); p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1")); t.put(p); - p = new Put(Bytes.toBytes("row2")); + p = new Put(Bytes.toBytes("row" + (i + 1))); p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2")); t.put(p); - p = new Put(Bytes.toBytes("row3")); + p = new Put(Bytes.toBytes("row" + (i + 2))); p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3")); t.put(p); - p = new Put(Bytes.toBytes("row4")); + p = new Put(Bytes.toBytes("row" + (i + 3))); p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4")); t.put(p); admin.flush(tableName);