This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch HBASE-29427
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 13fc673f63b3916e0316ee9d52fd64bd30353c9a
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Tue Nov 5 13:50:42 2024 +0000

    HBASE-29412 Extend date tiered compaction to allow for tiering by values 
other than cell timestamp
    
    Change-Id: I9574d01c5cb88d1ba35db3b0970d2d710207fdf4
---
 .../regionserver/CustomCellTieredStoreEngine.java  | 53 ++++++++++++++
 .../hbase/regionserver/DateTieredStoreEngine.java  | 17 ++++-
 .../hbase/regionserver/compactions/Compactor.java  |  5 ++
 .../CustomCellDateTieredCompactionPolicy.java      | 84 ++++++++++++++++++++++
 .../compactions/CustomCellTieredCompactor.java     | 27 +++++++
 .../compactions/DateTieredCompactionPolicy.java    |  2 +-
 6 files changed, 186 insertions(+), 2 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java
new file mode 100644
index 00000000000..fb940f41155
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CustomCellDateTieredCompactionPolicy;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredCompactor;
+import org.apache.yetus.audience.InterfaceAudience;
+import java.io.IOException;
+import static 
org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
+
+/**
+ * Extension of {@link DateTieredStoreEngine} that uses a pluggable value 
provider for
+ * extracting the value to be used for comparison in this tiered compaction.
+ *
+ * Differently from the existing Date Tiered Compaction, this doesn't yield 
multiple tiers
+ * or files, but rather provides two tiers based on a configurable “cut-off” 
age.
+ * All rows with the cell tiering value older than this “cut-off” age would be 
placed together
+ * in an “old” tier, whilst younger rows would go to a separate, “young” tier 
file.
+ */
[email protected]
+public class CustomCellTieredStoreEngine extends DateTieredStoreEngine {
+
+  @Override
+  protected void createComponents(Configuration conf, HStore store, 
CellComparator kvComparator)
+    throws IOException {
+    conf = new Configuration(conf);
+    conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
+      CustomCellDateTieredCompactionPolicy.class.getName());
+    createCompactionPolicy(conf, store);
+    this.storeFileManager = new DefaultStoreFileManager(kvComparator,
+      StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, conf, 
compactionPolicy.getConf());
+    this.storeFlusher = new DefaultStoreFlusher(conf, store);
+    this.compactor = new CustomCellTieredCompactor(conf, store);
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
index 26437ab1124..88eb59f69e8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -29,7 +29,9 @@ import 
org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequ
 import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import static 
org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
 
 /**
  * HBASE-15400 This store engine allows us to store data in date tiered layout 
with exponential
@@ -44,6 +46,19 @@ public class DateTieredStoreEngine extends 
StoreEngine<DefaultStoreFlusher,
 
   public static final String DATE_TIERED_STORE_ENGINE = 
DateTieredStoreEngine.class.getName();
 
+  protected void createCompactionPolicy(Configuration conf, HStore store) 
throws IOException {
+    String className = conf.get(
+      DEFAULT_COMPACTION_POLICY_CLASS_KEY, 
DateTieredCompactionPolicy.class.getName());
+    try {
+      compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className,
+        new Class[] { Configuration.class, StoreConfigInformation.class },
+        new Object[] { conf, store });
+    } catch (Exception e) {
+      throw new IOException("Unable to load configured compaction policy '" + 
className + "'", e);
+    }
+  }
+
+
   @Override
   public boolean needsCompaction(List<HStoreFile> filesCompacting) {
     return compactionPolicy.needsCompaction(storeFileManager.getStoreFiles(), 
filesCompacting);
@@ -57,7 +72,7 @@ public class DateTieredStoreEngine extends 
StoreEngine<DefaultStoreFlusher,
   @Override
   protected void createComponents(Configuration conf, HStore store, 
CellComparator kvComparator)
     throws IOException {
-    this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
+    createCompactionPolicy(conf, store);
     this.storeFileManager = new DefaultStoreFileManager(kvComparator,
       StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, conf, 
compactionPolicy.getConf());
     this.storeFlusher = new DefaultStoreFlusher(conf, store);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 055ad85e5a3..95a123c0a86 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -403,6 +403,10 @@ public abstract class Compactor<T extends CellSink> {
 
   protected abstract void abortWriter(T writer) throws IOException;
 
+  protected void decorateCells(List<ExtendedCell> cells) {
+    //no op
+  }
+
   /**
    * Performs the compaction.
    * @param fd                FileDetails of cell sink writer
@@ -459,6 +463,7 @@ public abstract class Compactor<T extends CellSink> {
         // output to writer:
         Cell lastCleanCell = null;
         long lastCleanCellSeqId = 0;
+        decorateCells(cells);
         for (ExtendedCell c : cells) {
           if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
             lastCleanCell = c;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java
new file mode 100644
index 00000000000..6421147c6c0
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java
@@ -0,0 +1,84 @@
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Custom implementation of DateTieredCompactionPolicy that calculates 
compaction boundaries based
+ * on the <b>hbase.hstore.compaction.date.tiered.custom.age.limit.millis</b> 
configuration property
+ * and the TIERING_CELL_MIN/TIERING_CELL_MAX stored on metadata of each store 
file.
+ *
+ * This policy would produce either one or two tiers:
+ *  - One tier if either all files data age are older than the configured age 
limit or all files
+ *  data age are younger than the configured age limit.
+ *  - Two tiers if files have both younger and older data than the configured 
age limit.
+ *
+ */
[email protected]
+public class CustomCellDateTieredCompactionPolicy extends 
DateTieredCompactionPolicy {
+
+  public static final String AGE_LIMIT_MILLIS =
+    "hbase.hstore.compaction.date.tiered.custom.age.limit.millis";
+
+  public static final String TIERING_CELL_MIN = "TIERING_CELL_MIN";
+
+  public static final String TIERING_CELL_MAX = "TIERING_CELL_MAX";
+
+  private long cutOffTimestamp;
+
+  public CustomCellDateTieredCompactionPolicy(Configuration conf,
+    StoreConfigInformation storeConfigInfo) throws IOException {
+    super(conf, storeConfigInfo);
+    cutOffTimestamp = EnvironmentEdgeManager.currentTime() -
+      conf.getLong(AGE_LIMIT_MILLIS, (long) (10*365.25*24*60*60*1000));
+
+  }
+
+  @Override
+  protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> 
filesToCompact, long now) {
+    MutableLong min = new MutableLong(Long.MAX_VALUE);
+    MutableLong max = new MutableLong(0);
+    filesToCompact.forEach(f -> {
+        byte[] fileMin = f.getMetadataValue(Bytes.toBytes(TIERING_CELL_MIN));
+        byte[] fileMax = f.getMetadataValue(Bytes.toBytes(TIERING_CELL_MAX));
+        if (fileMin != null) {
+          long minCurrent = Bytes.toLong(fileMin);
+          if(min.getValue() < minCurrent) {
+            min.setValue(minCurrent);
+          }
+        } else {
+          min.setValue(0);
+        }
+        if (fileMax != null) {
+          long maxCurrent = Bytes.toLong(fileMax);
+          if(max.getValue() > maxCurrent) {
+            max.setValue(maxCurrent);
+          }
+        } else {
+          max.setValue(Long.MAX_VALUE);
+        }
+      });
+
+    List<Long> boundaries = new ArrayList<>();
+    if (min.getValue() < cutOffTimestamp) {
+      boundaries.add(min.getValue());
+      if (max.getValue() > cutOffTimestamp) {
+        boundaries.add(cutOffTimestamp);
+      }
+    }
+    boundaries.add(Long.MIN_VALUE);
+    Collections.reverse(boundaries);
+    return boundaries;
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java
new file mode 100644
index 00000000000..8e1afee52e4
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java
@@ -0,0 +1,27 @@
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.yetus.audience.InterfaceAudience;
+import java.util.List;
+
+/**
+ * An extension of DateTieredCompactor, overriding the decorateCells method to 
allow for custom
+ * values to be used for the different file tiers during compaction.
+ */
[email protected]
+public class CustomCellTieredCompactor extends DateTieredCompactor {
+  public CustomCellTieredCompactor(Configuration conf, HStore store) {
+    super(conf, store);
+  }
+
+  @Override
+  protected void decorateCells(List<ExtendedCell> cells) {
+    //TODO
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
index 9dbe9aae9cf..a4de078f685 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
@@ -296,7 +296,7 @@ public class DateTieredCompactionPolicy extends 
SortedCompactionPolicy {
   /**
    * Return a list of boundaries for multiple compaction output in ascending 
order.
    */
-  private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> 
filesToCompact, long now) {
+  protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> 
filesToCompact, long now) {
     long minTimestamp = filesToCompact.stream()
       .mapToLong(f -> 
f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min().orElse(Long.MAX_VALUE);
 

Reply via email to