This is an automated email from the ASF dual-hosted git repository. wchevreuil pushed a commit to branch HBASE-29427 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 13fc673f63b3916e0316ee9d52fd64bd30353c9a Author: Wellington Ramos Chevreuil <[email protected]> AuthorDate: Tue Nov 5 13:50:42 2024 +0000 HBASE-29412 Extend date tiered compaction to allow for tiering by values other than cell timestamp Change-Id: I9574d01c5cb88d1ba35db3b0970d2d710207fdf4 --- .../regionserver/CustomCellTieredStoreEngine.java | 53 ++++++++++++++ .../hbase/regionserver/DateTieredStoreEngine.java | 17 ++++- .../hbase/regionserver/compactions/Compactor.java | 5 ++ .../CustomCellDateTieredCompactionPolicy.java | 84 ++++++++++++++++++++++ .../compactions/CustomCellTieredCompactor.java | 27 +++++++ .../compactions/DateTieredCompactionPolicy.java | 2 +- 6 files changed, 186 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java new file mode 100644 index 00000000000..fb940f41155 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.regionserver.compactions.CustomCellDateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredCompactor; +import org.apache.yetus.audience.InterfaceAudience; +import java.io.IOException; +import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; + +/** + * Extension of {@link DateTieredStoreEngine} that uses a pluggable value provider for + * extracting the value to be used for comparison in this tiered compaction. + * + * Differently from the existing Date Tiered Compaction, this doesn't yield multiple tiers + * or files, but rather provides two tiers based on a configurable “cut-off” age. + * All rows with the cell tiering value older than this “cut-off” age would be placed together + * in an “old” tier, whilst younger rows would go to a separate, “young” tier file. + */ [email protected] +public class CustomCellTieredStoreEngine extends DateTieredStoreEngine { + + @Override + protected void createComponents(Configuration conf, HStore store, CellComparator kvComparator) + throws IOException { + conf = new Configuration(conf); + conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, + CustomCellDateTieredCompactionPolicy.class.getName()); + createCompactionPolicy(conf, store); + this.storeFileManager = new DefaultStoreFileManager(kvComparator, + StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, conf, compactionPolicy.getConf()); + this.storeFlusher = new DefaultStoreFlusher(conf, store); + this.compactor = new CustomCellTieredCompactor(conf, store); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java index 26437ab1124..88eb59f69e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -29,7 +29,9 @@ import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequ import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; /** * HBASE-15400 This store engine allows us to store data in date tiered layout with exponential @@ -44,6 +46,19 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher, public static final String DATE_TIERED_STORE_ENGINE = DateTieredStoreEngine.class.getName(); + protected void createCompactionPolicy(Configuration conf, HStore store) throws IOException { + String className = conf.get( + DEFAULT_COMPACTION_POLICY_CLASS_KEY, DateTieredCompactionPolicy.class.getName()); + try { + compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className, + new Class[] { Configuration.class, StoreConfigInformation.class }, + new Object[] { conf, store }); + } catch (Exception e) { + throw new IOException("Unable to load configured compaction policy '" + className + "'", e); + } + } + + @Override public boolean needsCompaction(List<HStoreFile> filesCompacting) { return compactionPolicy.needsCompaction(storeFileManager.getStoreFiles(), filesCompacting); @@ -57,7 +72,7 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher, @Override protected void createComponents(Configuration conf, HStore store, CellComparator kvComparator) throws IOException { - this.compactionPolicy = new DateTieredCompactionPolicy(conf, store); + createCompactionPolicy(conf, store); this.storeFileManager = new DefaultStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, conf, compactionPolicy.getConf()); this.storeFlusher = new DefaultStoreFlusher(conf, store); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 055ad85e5a3..95a123c0a86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -403,6 +403,10 @@ public abstract class Compactor<T extends CellSink> { protected abstract void abortWriter(T writer) throws IOException; + protected void decorateCells(List<ExtendedCell> cells) { + //no op + } + /** * Performs the compaction. * @param fd FileDetails of cell sink writer @@ -459,6 +463,7 @@ public abstract class Compactor<T extends CellSink> { // output to writer: Cell lastCleanCell = null; long lastCleanCellSeqId = 0; + decorateCells(cells); for (ExtendedCell c : cells) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { lastCleanCell = c; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java new file mode 100644 index 00000000000..6421147c6c0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java @@ -0,0 +1,84 @@ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Custom implementation of DateTieredCompactionPolicy that calculates compaction boundaries based + * on the <b>hbase.hstore.compaction.date.tiered.custom.age.limit.millis</b> configuration property + * and the TIERING_CELL_MIN/TIERING_CELL_MAX stored on metadata of each store file. + * + * This policy would produce either one or two tiers: + * - One tier if either all files data age are older than the configured age limit or all files + * data age are younger than the configured age limit. + * - Two tiers if files have both younger and older data than the configured age limit. + * + */ [email protected] +public class CustomCellDateTieredCompactionPolicy extends DateTieredCompactionPolicy { + + public static final String AGE_LIMIT_MILLIS = + "hbase.hstore.compaction.date.tiered.custom.age.limit.millis"; + + public static final String TIERING_CELL_MIN = "TIERING_CELL_MIN"; + + public static final String TIERING_CELL_MAX = "TIERING_CELL_MAX"; + + private long cutOffTimestamp; + + public CustomCellDateTieredCompactionPolicy(Configuration conf, + StoreConfigInformation storeConfigInfo) throws IOException { + super(conf, storeConfigInfo); + cutOffTimestamp = EnvironmentEdgeManager.currentTime() - + conf.getLong(AGE_LIMIT_MILLIS, (long) (10*365.25*24*60*60*1000)); + + } + + @Override + protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, long now) { + MutableLong min = new MutableLong(Long.MAX_VALUE); + MutableLong max = new MutableLong(0); + filesToCompact.forEach(f -> { + byte[] fileMin = f.getMetadataValue(Bytes.toBytes(TIERING_CELL_MIN)); + byte[] fileMax = f.getMetadataValue(Bytes.toBytes(TIERING_CELL_MAX)); + if (fileMin != null) { + long minCurrent = Bytes.toLong(fileMin); + if(min.getValue() < minCurrent) { + min.setValue(minCurrent); + } + } else { + min.setValue(0); + } + if (fileMax != null) { + long maxCurrent = Bytes.toLong(fileMax); + if(max.getValue() > maxCurrent) { + max.setValue(maxCurrent); + } + } else { + max.setValue(Long.MAX_VALUE); + } + }); + + List<Long> boundaries = new ArrayList<>(); + if (min.getValue() < cutOffTimestamp) { + boundaries.add(min.getValue()); + if (max.getValue() > cutOffTimestamp) { + boundaries.add(cutOffTimestamp); + } + } + boundaries.add(Long.MIN_VALUE); + Collections.reverse(boundaries); + return boundaries; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java new file mode 100644 index 00000000000..8e1afee52e4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.yetus.audience.InterfaceAudience; +import java.util.List; + +/** + * An extension of DateTieredCompactor, overriding the decorateCells method to allow for custom + * values to be used for the different file tiers during compaction. + */ [email protected] +public class CustomCellTieredCompactor extends DateTieredCompactor { + public CustomCellTieredCompactor(Configuration conf, HStore store) { + super(conf, store); + } + + @Override + protected void decorateCells(List<ExtendedCell> cells) { + //TODO + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index 9dbe9aae9cf..a4de078f685 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -296,7 +296,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { /** * Return a list of boundaries for multiple compaction output in ascending order. */ - private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, long now) { + protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, long now) { long minTimestamp = filesToCompact.stream() .mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min().orElse(Long.MAX_VALUE);
