This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new e75a048  HBASE-24428 : Update compaction priority for recently split 
daughter regions (#1784)
e75a048 is described below

commit e75a0481f58cfbf9156d779e5c21986095953e09
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Wed May 27 20:56:41 2020 +0530

    HBASE-24428 : Update compaction priority for recently split daughter 
regions (#1784)
    
    Signed-off-by: Andrew Purtell <apurt...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HStore.java   | 23 +++++-
 .../hadoop/hbase/regionserver/StoreUtils.java      |  2 +-
 .../compactions/CompactionRequestImpl.java         | 13 +++
 .../compactions/SortedCompactionPolicy.java        |  1 +
 .../compactions/StripeCompactionPolicy.java        |  1 +
 .../TestSplitTransactionOnCluster.java             | 94 ++++++++++++++++++++--
 6 files changed, 126 insertions(+), 8 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 496b699..6687628 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -143,6 +143,12 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
   public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;
 
+  // HBASE-24428 : Update compaction priority for recently split daughter 
regions
+  // so as to prioritize their compaction.
+  // Any compaction candidate with higher priority than compaction of newly 
split daugher regions
+  // should have priority value < (Integer.MIN_VALUE + 1000)
+  private static final int SPLIT_REGION_COMPACTION_PRIORITY = 
Integer.MIN_VALUE + 1000;
+
   private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
 
   protected final MemStore memstore;
@@ -1937,7 +1943,22 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
 
         // Set common request properties.
         // Set priority, either override value supplied by caller or from 
store.
-        request.setPriority((priority != Store.NO_PRIORITY) ? priority : 
getCompactPriority());
+        final int compactionPriority =
+          (priority != Store.NO_PRIORITY) ? priority : getCompactPriority();
+        request.setPriority(compactionPriority);
+
+        if (request.isAfterSplit()) {
+          // If the store belongs to recently splitted daughter regions, 
better we consider
+          // them with the higher priority in the compaction queue.
+          // Override priority if it is lower (higher int value) than
+          // SPLIT_REGION_COMPACTION_PRIORITY
+          final int splitHousekeepingPriority =
+            Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY);
+          request.setPriority(splitHousekeepingPriority);
+          LOG.info("Keeping/Overriding Compaction request priority to {} for 
CF {} since it"
+              + " belongs to recently split daughter region {}", 
splitHousekeepingPriority,
+            this.getColumnFamilyName(), 
getRegionInfo().getRegionNameAsString());
+        }
         request.setDescription(getRegionInfo().getRegionNameAsString(), 
getColumnFamilyName());
         request.setTracker(tracker);
       }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
index dd17729..0e4f6c2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
@@ -52,7 +52,7 @@ public class StoreUtils {
    */
   public static boolean hasReferences(Collection<HStoreFile> files) {
     // TODO: make sure that we won't pass null here in the future.
-    return files != null ? files.stream().anyMatch(HStoreFile::isReference) : 
false;
+    return files != null && files.stream().anyMatch(HStoreFile::isReference);
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
index 4ea8e3f..899219d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
@@ -43,6 +43,7 @@ public class CompactionRequestImpl implements 
CompactionRequest {
   private DisplayCompactionType isMajor = DisplayCompactionType.MINOR;
   private int priority = NO_PRIORITY;
   private Collection<HStoreFile> filesToCompact;
+  private boolean isAfterSplit = false;
 
   // CompactRequest object creation time.
   private long selectionTime;
@@ -136,6 +137,14 @@ public class CompactionRequestImpl implements 
CompactionRequest {
     return tracker;
   }
 
+  public boolean isAfterSplit() {
+    return isAfterSplit;
+  }
+
+  public void setAfterSplit(boolean afterSplit) {
+    isAfterSplit = afterSplit;
+  }
+
   @Override
   public int hashCode() {
     final int prime = 31;
@@ -149,6 +158,7 @@ public class CompactionRequestImpl implements 
CompactionRequest {
     result = prime * result + ((storeName == null) ? 0 : storeName.hashCode());
     result = prime * result + (int) (totalSize ^ (totalSize >>> 32));
     result = prime * result + ((tracker == null) ? 0 : tracker.hashCode());
+    result = prime * result + (isAfterSplit ? 1231 : 1237);
     return result;
   }
 
@@ -200,6 +210,9 @@ public class CompactionRequestImpl implements 
CompactionRequest {
     if (totalSize != other.totalSize) {
       return false;
     }
+    if (isAfterSplit != other.isAfterSplit) {
+      return false;
+    }
     if (tracker == null) {
       if (other.tracker != null) {
         return false;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
index 9b30ab5..ef9f3ca 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
@@ -84,6 +84,7 @@ public abstract class SortedCompactionPolicy extends 
CompactionPolicy {
 
     CompactionRequestImpl result = createCompactionRequest(candidateSelection,
       isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
+    result.setAfterSplit(isAfterSplit);
 
     ArrayList<HStoreFile> filesToCompact = 
Lists.newArrayList(result.getFiles());
     removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
index 756faa9..443075c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
@@ -122,6 +122,7 @@ public class StripeCompactionPolicy extends 
CompactionPolicy {
       SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
           allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
       request.setMajorRangeFull();
+      request.getRequest().setAfterSplit(true);
       return request;
     }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 45f35ef..0ffd607 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
@@ -26,6 +27,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -74,6 +77,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
@@ -85,6 +89,7 @@ import 
org.apache.hadoop.hbase.master.assignment.RegionStateNode;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import 
org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -108,6 +113,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -278,6 +284,79 @@ public class TestSplitTransactionOnCluster {
     assertEquals(2, cluster.getRegions(tableName).size());
   }
 
+  @Test
+  public void testSplitCompactWithPriority() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    // Create table then get the single region for our new table.
+    byte[] cf = Bytes.toBytes("cf");
+    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)).build();
+    admin.createTable(htd);
+
+    assertNotEquals("Unable to retrieve regions of the table", -1,
+      TESTING_UTIL.waitFor(10000, () -> cluster.getRegions(tableName).size() 
== 1));
+
+    HRegion region = cluster.getRegions(tableName).get(0);
+    HStore store = region.getStore(cf);
+    int regionServerIndex = 
cluster.getServerWith(region.getRegionInfo().getRegionName());
+    HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
+
+    Table table = TESTING_UTIL.getConnection().getTable(tableName);
+    // insert data
+    insertData(tableName, admin, table);
+    insertData(tableName, admin, table, 20);
+    insertData(tableName, admin, table, 40);
+
+    // Compaction Request
+    store.triggerMajorCompaction();
+    Optional<CompactionContext> compactionContext = store.requestCompaction();
+    assertTrue(compactionContext.isPresent());
+    assertFalse(compactionContext.get().getRequest().isAfterSplit());
+    assertEquals(compactionContext.get().getRequest().getPriority(), 13);
+
+    // Split
+    long procId =
+      cluster.getMaster().splitRegion(region.getRegionInfo(), 
Bytes.toBytes("row4"), 0, 0);
+
+    // wait for the split to complete or get interrupted.  If the split 
completes successfully,
+    // the procedure will return true; if the split fails, the procedure would 
throw exception.
+    
ProcedureTestingUtility.waitProcedure(cluster.getMaster().getMasterProcedureExecutor(),
+      procId);
+
+    assertEquals(2, cluster.getRegions(tableName).size());
+    // we have 2 daughter regions
+    HRegion hRegion1 = cluster.getRegions(tableName).get(0);
+    HRegion hRegion2 = cluster.getRegions(tableName).get(1);
+    HStore hStore1 = hRegion1.getStore(cf);
+    HStore hStore2 = hRegion2.getStore(cf);
+
+    // For hStore1 && hStore2, set mock reference to one of the storeFiles
+    StoreFileInfo storeFileInfo1 = new 
ArrayList<>(hStore1.getStorefiles()).get(0).getFileInfo();
+    StoreFileInfo storeFileInfo2 = new 
ArrayList<>(hStore2.getStorefiles()).get(0).getFileInfo();
+    Field field = StoreFileInfo.class.getDeclaredField("reference");
+    field.setAccessible(true);
+    field.set(storeFileInfo1, Mockito.mock(Reference.class));
+    field.set(storeFileInfo2, Mockito.mock(Reference.class));
+    hStore1.triggerMajorCompaction();
+    hStore2.triggerMajorCompaction();
+
+    compactionContext = hStore1.requestCompaction();
+    assertTrue(compactionContext.isPresent());
+    // since we set mock reference to one of the storeFiles, we will get 
isAfterSplit=true &&
+    // highest priority for hStore1's compactionContext
+    assertTrue(compactionContext.get().getRequest().isAfterSplit());
+    assertEquals(compactionContext.get().getRequest().getPriority(), 
Integer.MIN_VALUE + 1000);
+
+    compactionContext =
+      hStore2.requestCompaction(Integer.MIN_VALUE + 10, 
CompactionLifeCycleTracker.DUMMY, null);
+    assertTrue(compactionContext.isPresent());
+    // compaction request contains higher priority than default priority of 
daughter region
+    // compaction (Integer.MIN_VALUE + 1000), hence we are expecting request 
priority to
+    // be accepted.
+    assertTrue(compactionContext.get().getRequest().isAfterSplit());
+    assertEquals(compactionContext.get().getRequest().getPriority(), 
Integer.MIN_VALUE + 10);
+  }
+
   public static class FailingSplitMasterObserver implements MasterCoprocessor, 
MasterObserver {
     volatile CountDownLatch latch;
 
@@ -637,18 +716,21 @@ public class TestSplitTransactionOnCluster {
     }
   }
 
-  private void insertData(final TableName tableName, Admin admin, Table t) 
throws IOException,
-      InterruptedException {
-    Put p = new Put(Bytes.toBytes("row1"));
+  private void insertData(final TableName tableName, Admin admin, Table t) 
throws IOException {
+    insertData(tableName, admin, t, 1);
+  }
+
+  private void insertData(TableName tableName, Admin admin, Table t, int i) 
throws IOException {
+    Put p = new Put(Bytes.toBytes("row" + i));
     p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
     t.put(p);
-    p = new Put(Bytes.toBytes("row2"));
+    p = new Put(Bytes.toBytes("row" + (i + 1)));
     p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2"));
     t.put(p);
-    p = new Put(Bytes.toBytes("row3"));
+    p = new Put(Bytes.toBytes("row" + (i + 2)));
     p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3"));
     t.put(p);
-    p = new Put(Bytes.toBytes("row4"));
+    p = new Put(Bytes.toBytes("row" + (i + 3)));
     p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4"));
     t.put(p);
     admin.flush(tableName);

Reply via email to