Copilot commented on code in PR #8075: URL: https://github.com/apache/hbase/pull/8075#discussion_r3284684898
########## hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/EWMABlockSizePredicator.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A {@link BlockCompressedSizePredicator} that uses an Exponentially Weighted Moving Average (EWMA) + * of the compression ratio to predict the uncompressed block size needed to produce compressed + * blocks close to the configured target block size. + */ [email protected] +public class EWMABlockSizePredicator implements BlockCompressedSizePredicator, Configurable { + + public static final String EWMA_ALPHA_KEY = "hbase.block.compressed.size.predicator.ewma.alpha"; + static final double DEFAULT_ALPHA = 0.5; + + private Configuration conf; + private double alpha = DEFAULT_ALPHA; + private double ewmaRatio; + private int adjustedBlockSize; + private int configuredMaxBlockSize; + private boolean initialized; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.alpha = conf.getDouble(EWMA_ALPHA_KEY, DEFAULT_ALPHA); + } Review Comment: setConf assumes conf is non-null and does not validate the configured alpha. If ReflectionUtils/setConf is ever called with null, this will NPE; and if alpha is <= 0, > 1, NaN, or Infinity the EWMA formula becomes invalid and can produce unstable/incorrect ratios. Consider defaulting to DEFAULT_ALPHA when conf is null and clamping/validating alpha to (0, 1]. ########## hbase-diagnostics/src/main/java/org/apache/hadoop/hbase/HFileBlockPerformanceEvaluation.java: ########## @@ -0,0 +1,716 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.EWMABlockSizePredicator; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator; +import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Performance evaluation utility for HFile block encoding, compression algorithms, and block size + * predicators ({@link BlockCompressedSizePredicator} implementations). + * <p> + * Tests are parameterized by number of blocks rather than number of rows. A tunable value generator + * produces cell values that compress to approximately the requested target ratio. The tool sweeps + * all combinations of compression algorithm, data block encoding, and block size (powers of two + * between min and max), and for predicator tests, also sweeps all three predicator implementations. + * </p> + * <h3>Usage</h3> + * + * <pre> + * HFileBlockPerformanceEvaluation [options] + * --blocks N Number of blocks per test (default: 100) + * --compressions LIST Comma-separated compression algorithms (default: none,gz) + * --encodings LIST Comma-separated data block encodings (default: none,fast_diff) + * --min-block-size BYTES Minimum block size in bytes (default: 8192) + * --max-block-size BYTES Maximum block size in bytes (default: 131072) + * --target-ratio FLOAT Target compression ratio (default: 3.0) + * --value-size BYTES Value size per cell in bytes (default: 1000) + * --predicators-only Run only predicator accuracy tests + * --throughput-only Run only read/write throughput tests + * </pre> + */ [email protected](HBaseInterfaceAudience.TOOLS) +public class HFileBlockPerformanceEvaluation { + + private static final Logger LOG = LoggerFactory.getLogger(HFileBlockPerformanceEvaluation.class); + + static { + System.setProperty("org.apache.commons.logging.Log", + "org.apache.commons.logging.impl.SimpleLog"); + System.setProperty( + "org.apache.commons.logging.simplelog.log.org.apache.hadoop.hbase.io.compress.CodecPool", + "WARN"); + } + + private static final int ROW_KEY_LENGTH = 16; + + private int numBlocks = 100; + private int minBlockSize = 8 * 1024; + private int maxBlockSize = 128 * 1024; + private double targetRatio = 3.0; + private int valueSize = 1000; + private boolean predicatorsOnly = false; + private boolean throughputOnly = false; + + private List<Compression.Algorithm> compressions = new ArrayList<>(); + private List<DataBlockEncoding> encodings = new ArrayList<>(); + + // ---- Value generator ---- + + /** + * Generates byte arrays that compress to approximately the given target ratio. A fraction (1 - + * 1/ratio) of the bytes are a repeating pattern and the rest are random. The repeating portion + * compresses nearly to zero while the random portion is incompressible, so the overall ratio + * approaches the target. + */ + static class CompressibleValueGenerator { + private final int valueSize; + private final int randomBytes; + private final byte patternByte; + + CompressibleValueGenerator(int valueSize, double targetRatio) { + this.valueSize = valueSize; + double incompressibleFraction = 1.0 / targetRatio; + this.randomBytes = Math.max(1, (int) (valueSize * incompressibleFraction)); + this.patternByte = (byte) 0x42; + } + + byte[] generate() { + byte[] value = new byte[valueSize]; + Arrays.fill(value, patternByte); + byte[] rand = new byte[randomBytes]; + Bytes.random(rand); + System.arraycopy(rand, 0, value, 0, Math.min(randomBytes, valueSize)); + return value; + } + } + + // ---- Cell creation ---- + + static byte[] formatRowKey(long i) { + String v = Long.toString(i); + StringBuilder sb = new StringBuilder(ROW_KEY_LENGTH); + for (int pad = ROW_KEY_LENGTH - v.length(); pad > 0; pad--) { + sb.append('0'); + } + sb.append(v); + return Bytes.toBytes(sb.toString()); + } + + static ExtendedCell createCell(long i, byte[] value) { + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(formatRowKey(i)) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("q")) + .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(KeyValue.Type.Put.getCode()) + .setValue(value).build(); + } + + // ---- Block size sweep ---- + + List<Integer> blockSizeSweep() { + List<Integer> sizes = new ArrayList<>(); + int size = Integer.highestOneBit(minBlockSize); + if (size < minBlockSize) { + size <<= 1; + } + while (size <= maxBlockSize) { + sizes.add(size); + size <<= 1; + } + if (sizes.isEmpty()) { + sizes.add(minBlockSize); + } + return sizes; + } + + // ---- Predicator classes ---- + + @SuppressWarnings("unchecked") + static final Class<? extends BlockCompressedSizePredicator>[] PREDICATOR_CLASSES = + new Class[] { UncompressedBlockSizePredicator.class, + PreviousBlockCompressionRatePredicator.class, EWMABlockSizePredicator.class }; + + // ---- Write helper: writes enough cells to fill numBlocks blocks ---- + + /** + * Writes an HFile with the given parameters and returns the path. Writes cells until at least + * {@code numBlocks} data blocks are produced. + */ + Path writeHFile(Configuration conf, FileSystem fs, Path dir, Compression.Algorithm compression, + DataBlockEncoding encoding, int blockSize, String label) throws IOException { + Path path = new Path(dir, "blockencpe-" + label + ".hfile"); + if (fs.exists(path)) { + fs.delete(path, false); + } + + HFileContext context = new HFileContextBuilder().withCompression(compression) + .withDataBlockEncoding(encoding).withBlockSize(blockSize).build(); + + CompressibleValueGenerator valueGen = new CompressibleValueGenerator(valueSize, targetRatio); + + long rowIndex = 0; + // Estimate rows per block based on uncompressed cell size to avoid writing unnecessary cells. + // Each cell is roughly ROW_KEY_LENGTH + family + qualifier + value + KV overhead. + int approxCellSize = ROW_KEY_LENGTH + 2 + 1 + valueSize + KeyValue.FIXED_OVERHEAD; + long estimatedRowsPerBlock = Math.max(1, blockSize / approxCellSize); + // For compressed data the predicator will let blocks grow larger, account for the ratio. + if (compression != Compression.Algorithm.NONE) { + estimatedRowsPerBlock = (long) (estimatedRowsPerBlock * targetRatio); + } + long maxRows = estimatedRowsPerBlock * numBlocks * 3L; + + try (HFile.Writer writer = + HFile.getWriterFactoryNoCache(conf).withPath(fs, path).withFileContext(context).create()) { + for (rowIndex = 0; rowIndex < maxRows; rowIndex++) { + writer.append(createCell(rowIndex, valueGen.generate())); + } + } + return path; + } + + // ---- Predicator accuracy benchmark ---- + + static class PredicatorAccuracyResult { + final String predicatorName; + final String compression; + final String encoding; + final int blockSize; + final int actualBlocks; + final double meanCompressedSize; + final double stddevCompressedSize; + final int minCompressedSize; + final int maxCompressedSize; + final double meanDeviationPct; + + PredicatorAccuracyResult(String predicatorName, String compression, String encoding, + int blockSize, int actualBlocks, double meanCompressedSize, double stddevCompressedSize, + int minCompressedSize, int maxCompressedSize, double meanDeviationPct) { + this.predicatorName = predicatorName; + this.compression = compression; + this.encoding = encoding; + this.blockSize = blockSize; + this.actualBlocks = actualBlocks; + this.meanCompressedSize = meanCompressedSize; + this.stddevCompressedSize = stddevCompressedSize; + this.minCompressedSize = minCompressedSize; + this.maxCompressedSize = maxCompressedSize; + this.meanDeviationPct = meanDeviationPct; + } + } + + List<PredicatorAccuracyResult> runPredicatorAccuracyTests(Configuration baseConf, FileSystem fs, + Path dir) throws IOException { + List<PredicatorAccuracyResult> results = new ArrayList<>(); + List<Integer> blockSizes = blockSizeSweep(); + + for (Compression.Algorithm compression : compressions) { + for (DataBlockEncoding encoding : encodings) { + for (int blockSize : blockSizes) { + for (Class<? extends BlockCompressedSizePredicator> predClass : PREDICATOR_CLASSES) { + String predName = predClass.getSimpleName(); + String label = String.format("%s-%s-%d-%s", compression.getName(), encoding.name(), + blockSize, predName); + + Configuration conf = new Configuration(baseConf); + conf.setClass(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, predClass, + BlockCompressedSizePredicator.class); + + LOG.info("Predicator accuracy test: compression={}, encoding={}, blockSize={}, " + + "predicator={}", compression.getName(), encoding.name(), blockSize, predName); + + long startTime = EnvironmentEdgeManager.currentTime(); + Path hfilePath = writeHFile(conf, fs, dir, compression, encoding, blockSize, label); + long writeElapsed = EnvironmentEdgeManager.currentTime() - startTime; + + List<Integer> compressedBlockSizes = new ArrayList<>(); + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + scanner.seekTo(); + HFileBlock prevBlock = null; + do { + Cell cell = scanner.getCell(); + if (cell == null) { + break; + } + // We iterate cells but track block transitions via the scanner's internal block + } while (scanner.next()); + } + + // Read block-level info by traversing the data index + int dataBlockCount = reader.getTrailer().getDataIndexCount(); + // Use the reader's block iterator to get on-disk sizes + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + if (scanner.seekTo()) { + HFileBlock block = null; + long offset = reader.getTrailer().getFirstDataBlockOffset(); + long lastOffset = -1; + // Read blocks directly + while (offset >= 0 && offset != lastOffset) { + lastOffset = offset; + try { + block = reader.readBlock(offset, -1, false, false, false, true, null, + reader.getDataBlockEncoding()); + if (block == null || !block.getBlockType().isData()) { + break; + } + compressedBlockSizes.add(block.getOnDiskSizeWithHeader()); + long nextOffset = offset + block.getOnDiskSizeWithHeader(); + block.release(); + block = null; + offset = nextOffset; + } catch (Exception e) { + break; + } + } Review Comment: If reader.readBlock returns a non-null block that is not a data block, the loop breaks without releasing the HFileBlock, which can leak buffer references. Ensure block.release() is called on all exit paths (including non-data blocks). ########## hbase-diagnostics/src/main/java/org/apache/hadoop/hbase/HFileBlockPerformanceEvaluation.java: ########## @@ -0,0 +1,716 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.EWMABlockSizePredicator; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator; +import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Performance evaluation utility for HFile block encoding, compression algorithms, and block size + * predicators ({@link BlockCompressedSizePredicator} implementations). + * <p> + * Tests are parameterized by number of blocks rather than number of rows. A tunable value generator + * produces cell values that compress to approximately the requested target ratio. The tool sweeps + * all combinations of compression algorithm, data block encoding, and block size (powers of two + * between min and max), and for predicator tests, also sweeps all three predicator implementations. + * </p> + * <h3>Usage</h3> + * + * <pre> + * HFileBlockPerformanceEvaluation [options] + * --blocks N Number of blocks per test (default: 100) + * --compressions LIST Comma-separated compression algorithms (default: none,gz) + * --encodings LIST Comma-separated data block encodings (default: none,fast_diff) + * --min-block-size BYTES Minimum block size in bytes (default: 8192) + * --max-block-size BYTES Maximum block size in bytes (default: 131072) + * --target-ratio FLOAT Target compression ratio (default: 3.0) + * --value-size BYTES Value size per cell in bytes (default: 1000) + * --predicators-only Run only predicator accuracy tests + * --throughput-only Run only read/write throughput tests + * </pre> + */ [email protected](HBaseInterfaceAudience.TOOLS) +public class HFileBlockPerformanceEvaluation { + + private static final Logger LOG = LoggerFactory.getLogger(HFileBlockPerformanceEvaluation.class); + + static { + System.setProperty("org.apache.commons.logging.Log", + "org.apache.commons.logging.impl.SimpleLog"); + System.setProperty( + "org.apache.commons.logging.simplelog.log.org.apache.hadoop.hbase.io.compress.CodecPool", + "WARN"); + } + + private static final int ROW_KEY_LENGTH = 16; + + private int numBlocks = 100; + private int minBlockSize = 8 * 1024; + private int maxBlockSize = 128 * 1024; + private double targetRatio = 3.0; + private int valueSize = 1000; + private boolean predicatorsOnly = false; + private boolean throughputOnly = false; + + private List<Compression.Algorithm> compressions = new ArrayList<>(); + private List<DataBlockEncoding> encodings = new ArrayList<>(); + + // ---- Value generator ---- + + /** + * Generates byte arrays that compress to approximately the given target ratio. A fraction (1 - + * 1/ratio) of the bytes are a repeating pattern and the rest are random. The repeating portion + * compresses nearly to zero while the random portion is incompressible, so the overall ratio + * approaches the target. + */ + static class CompressibleValueGenerator { + private final int valueSize; + private final int randomBytes; + private final byte patternByte; + + CompressibleValueGenerator(int valueSize, double targetRatio) { + this.valueSize = valueSize; + double incompressibleFraction = 1.0 / targetRatio; + this.randomBytes = Math.max(1, (int) (valueSize * incompressibleFraction)); + this.patternByte = (byte) 0x42; + } + + byte[] generate() { + byte[] value = new byte[valueSize]; + Arrays.fill(value, patternByte); + byte[] rand = new byte[randomBytes]; + Bytes.random(rand); + System.arraycopy(rand, 0, value, 0, Math.min(randomBytes, valueSize)); + return value; + } + } + + // ---- Cell creation ---- + + static byte[] formatRowKey(long i) { + String v = Long.toString(i); + StringBuilder sb = new StringBuilder(ROW_KEY_LENGTH); + for (int pad = ROW_KEY_LENGTH - v.length(); pad > 0; pad--) { + sb.append('0'); + } + sb.append(v); + return Bytes.toBytes(sb.toString()); + } + + static ExtendedCell createCell(long i, byte[] value) { + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(formatRowKey(i)) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("q")) + .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(KeyValue.Type.Put.getCode()) + .setValue(value).build(); + } + + // ---- Block size sweep ---- + + List<Integer> blockSizeSweep() { + List<Integer> sizes = new ArrayList<>(); + int size = Integer.highestOneBit(minBlockSize); + if (size < minBlockSize) { + size <<= 1; + } + while (size <= maxBlockSize) { + sizes.add(size); + size <<= 1; + } + if (sizes.isEmpty()) { + sizes.add(minBlockSize); + } + return sizes; + } + + // ---- Predicator classes ---- + + @SuppressWarnings("unchecked") + static final Class<? extends BlockCompressedSizePredicator>[] PREDICATOR_CLASSES = + new Class[] { UncompressedBlockSizePredicator.class, + PreviousBlockCompressionRatePredicator.class, EWMABlockSizePredicator.class }; + + // ---- Write helper: writes enough cells to fill numBlocks blocks ---- + + /** + * Writes an HFile with the given parameters and returns the path. Writes cells until at least + * {@code numBlocks} data blocks are produced. + */ + Path writeHFile(Configuration conf, FileSystem fs, Path dir, Compression.Algorithm compression, + DataBlockEncoding encoding, int blockSize, String label) throws IOException { + Path path = new Path(dir, "blockencpe-" + label + ".hfile"); + if (fs.exists(path)) { + fs.delete(path, false); + } + + HFileContext context = new HFileContextBuilder().withCompression(compression) + .withDataBlockEncoding(encoding).withBlockSize(blockSize).build(); + + CompressibleValueGenerator valueGen = new CompressibleValueGenerator(valueSize, targetRatio); + + long rowIndex = 0; + // Estimate rows per block based on uncompressed cell size to avoid writing unnecessary cells. + // Each cell is roughly ROW_KEY_LENGTH + family + qualifier + value + KV overhead. + int approxCellSize = ROW_KEY_LENGTH + 2 + 1 + valueSize + KeyValue.FIXED_OVERHEAD; + long estimatedRowsPerBlock = Math.max(1, blockSize / approxCellSize); + // For compressed data the predicator will let blocks grow larger, account for the ratio. + if (compression != Compression.Algorithm.NONE) { + estimatedRowsPerBlock = (long) (estimatedRowsPerBlock * targetRatio); + } + long maxRows = estimatedRowsPerBlock * numBlocks * 3L; + + try (HFile.Writer writer = + HFile.getWriterFactoryNoCache(conf).withPath(fs, path).withFileContext(context).create()) { + for (rowIndex = 0; rowIndex < maxRows; rowIndex++) { + writer.append(createCell(rowIndex, valueGen.generate())); + } + } + return path; + } + + // ---- Predicator accuracy benchmark ---- + + static class PredicatorAccuracyResult { + final String predicatorName; + final String compression; + final String encoding; + final int blockSize; + final int actualBlocks; + final double meanCompressedSize; + final double stddevCompressedSize; + final int minCompressedSize; + final int maxCompressedSize; + final double meanDeviationPct; + + PredicatorAccuracyResult(String predicatorName, String compression, String encoding, + int blockSize, int actualBlocks, double meanCompressedSize, double stddevCompressedSize, + int minCompressedSize, int maxCompressedSize, double meanDeviationPct) { + this.predicatorName = predicatorName; + this.compression = compression; + this.encoding = encoding; + this.blockSize = blockSize; + this.actualBlocks = actualBlocks; + this.meanCompressedSize = meanCompressedSize; + this.stddevCompressedSize = stddevCompressedSize; + this.minCompressedSize = minCompressedSize; + this.maxCompressedSize = maxCompressedSize; + this.meanDeviationPct = meanDeviationPct; + } + } + + List<PredicatorAccuracyResult> runPredicatorAccuracyTests(Configuration baseConf, FileSystem fs, + Path dir) throws IOException { + List<PredicatorAccuracyResult> results = new ArrayList<>(); + List<Integer> blockSizes = blockSizeSweep(); + + for (Compression.Algorithm compression : compressions) { + for (DataBlockEncoding encoding : encodings) { + for (int blockSize : blockSizes) { + for (Class<? extends BlockCompressedSizePredicator> predClass : PREDICATOR_CLASSES) { + String predName = predClass.getSimpleName(); + String label = String.format("%s-%s-%d-%s", compression.getName(), encoding.name(), + blockSize, predName); + + Configuration conf = new Configuration(baseConf); + conf.setClass(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, predClass, + BlockCompressedSizePredicator.class); + + LOG.info("Predicator accuracy test: compression={}, encoding={}, blockSize={}, " + + "predicator={}", compression.getName(), encoding.name(), blockSize, predName); + + long startTime = EnvironmentEdgeManager.currentTime(); + Path hfilePath = writeHFile(conf, fs, dir, compression, encoding, blockSize, label); + long writeElapsed = EnvironmentEdgeManager.currentTime() - startTime; + + List<Integer> compressedBlockSizes = new ArrayList<>(); + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + scanner.seekTo(); + HFileBlock prevBlock = null; + do { + Cell cell = scanner.getCell(); + if (cell == null) { + break; + } + // We iterate cells but track block transitions via the scanner's internal block + } while (scanner.next()); + } + + // Read block-level info by traversing the data index + int dataBlockCount = reader.getTrailer().getDataIndexCount(); + // Use the reader's block iterator to get on-disk sizes + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + if (scanner.seekTo()) { + HFileBlock block = null; + long offset = reader.getTrailer().getFirstDataBlockOffset(); + long lastOffset = -1; + // Read blocks directly + while (offset >= 0 && offset != lastOffset) { + lastOffset = offset; + try { + block = reader.readBlock(offset, -1, false, false, false, true, null, + reader.getDataBlockEncoding()); + if (block == null || !block.getBlockType().isData()) { + break; + } + compressedBlockSizes.add(block.getOnDiskSizeWithHeader()); + long nextOffset = offset + block.getOnDiskSizeWithHeader(); + block.release(); + block = null; + offset = nextOffset; + } catch (Exception e) { + break; + } + } Review Comment: The catch-all exception handler breaks out of the loop without logging and can also bypass releasing a non-null HFileBlock from a partially completed readBlock call. This can hide real read errors and leak resources; release in a finally and at least log at debug/warn. ########## hbase-diagnostics/src/main/java/org/apache/hadoop/hbase/HFileBlockPerformanceEvaluation.java: ########## @@ -0,0 +1,716 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.EWMABlockSizePredicator; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator; +import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Performance evaluation utility for HFile block encoding, compression algorithms, and block size + * predicators ({@link BlockCompressedSizePredicator} implementations). + * <p> + * Tests are parameterized by number of blocks rather than number of rows. A tunable value generator + * produces cell values that compress to approximately the requested target ratio. The tool sweeps + * all combinations of compression algorithm, data block encoding, and block size (powers of two + * between min and max), and for predicator tests, also sweeps all three predicator implementations. + * </p> + * <h3>Usage</h3> + * + * <pre> + * HFileBlockPerformanceEvaluation [options] + * --blocks N Number of blocks per test (default: 100) + * --compressions LIST Comma-separated compression algorithms (default: none,gz) + * --encodings LIST Comma-separated data block encodings (default: none,fast_diff) + * --min-block-size BYTES Minimum block size in bytes (default: 8192) + * --max-block-size BYTES Maximum block size in bytes (default: 131072) + * --target-ratio FLOAT Target compression ratio (default: 3.0) + * --value-size BYTES Value size per cell in bytes (default: 1000) + * --predicators-only Run only predicator accuracy tests + * --throughput-only Run only read/write throughput tests + * </pre> + */ [email protected](HBaseInterfaceAudience.TOOLS) +public class HFileBlockPerformanceEvaluation { + + private static final Logger LOG = LoggerFactory.getLogger(HFileBlockPerformanceEvaluation.class); + + static { + System.setProperty("org.apache.commons.logging.Log", + "org.apache.commons.logging.impl.SimpleLog"); + System.setProperty( + "org.apache.commons.logging.simplelog.log.org.apache.hadoop.hbase.io.compress.CodecPool", + "WARN"); + } + + private static final int ROW_KEY_LENGTH = 16; + + private int numBlocks = 100; + private int minBlockSize = 8 * 1024; + private int maxBlockSize = 128 * 1024; + private double targetRatio = 3.0; + private int valueSize = 1000; + private boolean predicatorsOnly = false; + private boolean throughputOnly = false; + + private List<Compression.Algorithm> compressions = new ArrayList<>(); + private List<DataBlockEncoding> encodings = new ArrayList<>(); + + // ---- Value generator ---- + + /** + * Generates byte arrays that compress to approximately the given target ratio. A fraction (1 - + * 1/ratio) of the bytes are a repeating pattern and the rest are random. The repeating portion + * compresses nearly to zero while the random portion is incompressible, so the overall ratio + * approaches the target. + */ + static class CompressibleValueGenerator { + private final int valueSize; + private final int randomBytes; + private final byte patternByte; + + CompressibleValueGenerator(int valueSize, double targetRatio) { + this.valueSize = valueSize; + double incompressibleFraction = 1.0 / targetRatio; + this.randomBytes = Math.max(1, (int) (valueSize * incompressibleFraction)); + this.patternByte = (byte) 0x42; + } + + byte[] generate() { + byte[] value = new byte[valueSize]; + Arrays.fill(value, patternByte); + byte[] rand = new byte[randomBytes]; + Bytes.random(rand); + System.arraycopy(rand, 0, value, 0, Math.min(randomBytes, valueSize)); + return value; + } + } + + // ---- Cell creation ---- + + static byte[] formatRowKey(long i) { + String v = Long.toString(i); + StringBuilder sb = new StringBuilder(ROW_KEY_LENGTH); + for (int pad = ROW_KEY_LENGTH - v.length(); pad > 0; pad--) { + sb.append('0'); + } + sb.append(v); + return Bytes.toBytes(sb.toString()); + } + + static ExtendedCell createCell(long i, byte[] value) { + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(formatRowKey(i)) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("q")) + .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(KeyValue.Type.Put.getCode()) + .setValue(value).build(); + } + + // ---- Block size sweep ---- + + List<Integer> blockSizeSweep() { + List<Integer> sizes = new ArrayList<>(); + int size = Integer.highestOneBit(minBlockSize); + if (size < minBlockSize) { + size <<= 1; + } + while (size <= maxBlockSize) { + sizes.add(size); + size <<= 1; + } + if (sizes.isEmpty()) { + sizes.add(minBlockSize); + } + return sizes; + } + + // ---- Predicator classes ---- + + @SuppressWarnings("unchecked") + static final Class<? extends BlockCompressedSizePredicator>[] PREDICATOR_CLASSES = + new Class[] { UncompressedBlockSizePredicator.class, + PreviousBlockCompressionRatePredicator.class, EWMABlockSizePredicator.class }; + + // ---- Write helper: writes enough cells to fill numBlocks blocks ---- + + /** + * Writes an HFile with the given parameters and returns the path. Writes cells until at least + * {@code numBlocks} data blocks are produced. + */ + Path writeHFile(Configuration conf, FileSystem fs, Path dir, Compression.Algorithm compression, + DataBlockEncoding encoding, int blockSize, String label) throws IOException { + Path path = new Path(dir, "blockencpe-" + label + ".hfile"); + if (fs.exists(path)) { + fs.delete(path, false); + } + + HFileContext context = new HFileContextBuilder().withCompression(compression) + .withDataBlockEncoding(encoding).withBlockSize(blockSize).build(); + + CompressibleValueGenerator valueGen = new CompressibleValueGenerator(valueSize, targetRatio); + + long rowIndex = 0; + // Estimate rows per block based on uncompressed cell size to avoid writing unnecessary cells. + // Each cell is roughly ROW_KEY_LENGTH + family + qualifier + value + KV overhead. + int approxCellSize = ROW_KEY_LENGTH + 2 + 1 + valueSize + KeyValue.FIXED_OVERHEAD; + long estimatedRowsPerBlock = Math.max(1, blockSize / approxCellSize); + // For compressed data the predicator will let blocks grow larger, account for the ratio. + if (compression != Compression.Algorithm.NONE) { + estimatedRowsPerBlock = (long) (estimatedRowsPerBlock * targetRatio); + } + long maxRows = estimatedRowsPerBlock * numBlocks * 3L; + + try (HFile.Writer writer = + HFile.getWriterFactoryNoCache(conf).withPath(fs, path).withFileContext(context).create()) { + for (rowIndex = 0; rowIndex < maxRows; rowIndex++) { + writer.append(createCell(rowIndex, valueGen.generate())); + } + } + return path; + } + + // ---- Predicator accuracy benchmark ---- + + static class PredicatorAccuracyResult { + final String predicatorName; + final String compression; + final String encoding; + final int blockSize; + final int actualBlocks; + final double meanCompressedSize; + final double stddevCompressedSize; + final int minCompressedSize; + final int maxCompressedSize; + final double meanDeviationPct; + + PredicatorAccuracyResult(String predicatorName, String compression, String encoding, + int blockSize, int actualBlocks, double meanCompressedSize, double stddevCompressedSize, + int minCompressedSize, int maxCompressedSize, double meanDeviationPct) { + this.predicatorName = predicatorName; + this.compression = compression; + this.encoding = encoding; + this.blockSize = blockSize; + this.actualBlocks = actualBlocks; + this.meanCompressedSize = meanCompressedSize; + this.stddevCompressedSize = stddevCompressedSize; + this.minCompressedSize = minCompressedSize; + this.maxCompressedSize = maxCompressedSize; + this.meanDeviationPct = meanDeviationPct; + } + } + + List<PredicatorAccuracyResult> runPredicatorAccuracyTests(Configuration baseConf, FileSystem fs, + Path dir) throws IOException { + List<PredicatorAccuracyResult> results = new ArrayList<>(); + List<Integer> blockSizes = blockSizeSweep(); + + for (Compression.Algorithm compression : compressions) { + for (DataBlockEncoding encoding : encodings) { + for (int blockSize : blockSizes) { + for (Class<? extends BlockCompressedSizePredicator> predClass : PREDICATOR_CLASSES) { + String predName = predClass.getSimpleName(); + String label = String.format("%s-%s-%d-%s", compression.getName(), encoding.name(), + blockSize, predName); + + Configuration conf = new Configuration(baseConf); + conf.setClass(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, predClass, + BlockCompressedSizePredicator.class); + + LOG.info("Predicator accuracy test: compression={}, encoding={}, blockSize={}, " + + "predicator={}", compression.getName(), encoding.name(), blockSize, predName); + + long startTime = EnvironmentEdgeManager.currentTime(); + Path hfilePath = writeHFile(conf, fs, dir, compression, encoding, blockSize, label); + long writeElapsed = EnvironmentEdgeManager.currentTime() - startTime; + + List<Integer> compressedBlockSizes = new ArrayList<>(); + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + scanner.seekTo(); + HFileBlock prevBlock = null; + do { + Cell cell = scanner.getCell(); + if (cell == null) { + break; + } + // We iterate cells but track block transitions via the scanner's internal block + } while (scanner.next()); + } + + // Read block-level info by traversing the data index + int dataBlockCount = reader.getTrailer().getDataIndexCount(); + // Use the reader's block iterator to get on-disk sizes Review Comment: This first scanner loop and the local variables (prevBlock, dataBlockCount) are effectively unused and do not contribute to collecting block sizes. Keeping this dead code makes the benchmark harder to understand and slightly increases runtime; remove it or replace it with logic that actually derives block boundaries/sizes. ########## hbase-diagnostics/src/main/java/org/apache/hadoop/hbase/HFileBlockPerformanceEvaluation.java: ########## @@ -0,0 +1,716 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.EWMABlockSizePredicator; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator; +import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Performance evaluation utility for HFile block encoding, compression algorithms, and block size + * predicators ({@link BlockCompressedSizePredicator} implementations). + * <p> + * Tests are parameterized by number of blocks rather than number of rows. A tunable value generator + * produces cell values that compress to approximately the requested target ratio. The tool sweeps + * all combinations of compression algorithm, data block encoding, and block size (powers of two + * between min and max), and for predicator tests, also sweeps all three predicator implementations. + * </p> + * <h3>Usage</h3> + * + * <pre> + * HFileBlockPerformanceEvaluation [options] + * --blocks N Number of blocks per test (default: 100) + * --compressions LIST Comma-separated compression algorithms (default: none,gz) + * --encodings LIST Comma-separated data block encodings (default: none,fast_diff) + * --min-block-size BYTES Minimum block size in bytes (default: 8192) + * --max-block-size BYTES Maximum block size in bytes (default: 131072) + * --target-ratio FLOAT Target compression ratio (default: 3.0) + * --value-size BYTES Value size per cell in bytes (default: 1000) + * --predicators-only Run only predicator accuracy tests + * --throughput-only Run only read/write throughput tests + * </pre> + */ [email protected](HBaseInterfaceAudience.TOOLS) +public class HFileBlockPerformanceEvaluation { + + private static final Logger LOG = LoggerFactory.getLogger(HFileBlockPerformanceEvaluation.class); + + static { + System.setProperty("org.apache.commons.logging.Log", + "org.apache.commons.logging.impl.SimpleLog"); + System.setProperty( + "org.apache.commons.logging.simplelog.log.org.apache.hadoop.hbase.io.compress.CodecPool", + "WARN"); + } + + private static final int ROW_KEY_LENGTH = 16; + + private int numBlocks = 100; + private int minBlockSize = 8 * 1024; + private int maxBlockSize = 128 * 1024; + private double targetRatio = 3.0; + private int valueSize = 1000; + private boolean predicatorsOnly = false; + private boolean throughputOnly = false; + + private List<Compression.Algorithm> compressions = new ArrayList<>(); + private List<DataBlockEncoding> encodings = new ArrayList<>(); + + // ---- Value generator ---- + + /** + * Generates byte arrays that compress to approximately the given target ratio. A fraction (1 - + * 1/ratio) of the bytes are a repeating pattern and the rest are random. The repeating portion + * compresses nearly to zero while the random portion is incompressible, so the overall ratio + * approaches the target. + */ + static class CompressibleValueGenerator { + private final int valueSize; + private final int randomBytes; + private final byte patternByte; + + CompressibleValueGenerator(int valueSize, double targetRatio) { + this.valueSize = valueSize; + double incompressibleFraction = 1.0 / targetRatio; + this.randomBytes = Math.max(1, (int) (valueSize * incompressibleFraction)); + this.patternByte = (byte) 0x42; + } + + byte[] generate() { + byte[] value = new byte[valueSize]; + Arrays.fill(value, patternByte); + byte[] rand = new byte[randomBytes]; + Bytes.random(rand); + System.arraycopy(rand, 0, value, 0, Math.min(randomBytes, valueSize)); + return value; + } + } + + // ---- Cell creation ---- + + static byte[] formatRowKey(long i) { + String v = Long.toString(i); + StringBuilder sb = new StringBuilder(ROW_KEY_LENGTH); + for (int pad = ROW_KEY_LENGTH - v.length(); pad > 0; pad--) { + sb.append('0'); + } + sb.append(v); + return Bytes.toBytes(sb.toString()); + } + + static ExtendedCell createCell(long i, byte[] value) { + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(formatRowKey(i)) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("q")) + .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(KeyValue.Type.Put.getCode()) + .setValue(value).build(); + } + + // ---- Block size sweep ---- + + List<Integer> blockSizeSweep() { + List<Integer> sizes = new ArrayList<>(); + int size = Integer.highestOneBit(minBlockSize); + if (size < minBlockSize) { + size <<= 1; + } + while (size <= maxBlockSize) { + sizes.add(size); + size <<= 1; + } + if (sizes.isEmpty()) { + sizes.add(minBlockSize); + } + return sizes; + } + + // ---- Predicator classes ---- + + @SuppressWarnings("unchecked") + static final Class<? extends BlockCompressedSizePredicator>[] PREDICATOR_CLASSES = + new Class[] { UncompressedBlockSizePredicator.class, + PreviousBlockCompressionRatePredicator.class, EWMABlockSizePredicator.class }; + + // ---- Write helper: writes enough cells to fill numBlocks blocks ---- + + /** + * Writes an HFile with the given parameters and returns the path. Writes cells until at least + * {@code numBlocks} data blocks are produced. + */ + Path writeHFile(Configuration conf, FileSystem fs, Path dir, Compression.Algorithm compression, + DataBlockEncoding encoding, int blockSize, String label) throws IOException { + Path path = new Path(dir, "blockencpe-" + label + ".hfile"); + if (fs.exists(path)) { + fs.delete(path, false); + } + + HFileContext context = new HFileContextBuilder().withCompression(compression) + .withDataBlockEncoding(encoding).withBlockSize(blockSize).build(); + + CompressibleValueGenerator valueGen = new CompressibleValueGenerator(valueSize, targetRatio); + + long rowIndex = 0; + // Estimate rows per block based on uncompressed cell size to avoid writing unnecessary cells. + // Each cell is roughly ROW_KEY_LENGTH + family + qualifier + value + KV overhead. + int approxCellSize = ROW_KEY_LENGTH + 2 + 1 + valueSize + KeyValue.FIXED_OVERHEAD; + long estimatedRowsPerBlock = Math.max(1, blockSize / approxCellSize); + // For compressed data the predicator will let blocks grow larger, account for the ratio. + if (compression != Compression.Algorithm.NONE) { + estimatedRowsPerBlock = (long) (estimatedRowsPerBlock * targetRatio); + } + long maxRows = estimatedRowsPerBlock * numBlocks * 3L; + + try (HFile.Writer writer = + HFile.getWriterFactoryNoCache(conf).withPath(fs, path).withFileContext(context).create()) { + for (rowIndex = 0; rowIndex < maxRows; rowIndex++) { + writer.append(createCell(rowIndex, valueGen.generate())); + } + } + return path; + } + + // ---- Predicator accuracy benchmark ---- + + static class PredicatorAccuracyResult { + final String predicatorName; + final String compression; + final String encoding; + final int blockSize; + final int actualBlocks; + final double meanCompressedSize; + final double stddevCompressedSize; + final int minCompressedSize; + final int maxCompressedSize; + final double meanDeviationPct; + + PredicatorAccuracyResult(String predicatorName, String compression, String encoding, + int blockSize, int actualBlocks, double meanCompressedSize, double stddevCompressedSize, + int minCompressedSize, int maxCompressedSize, double meanDeviationPct) { + this.predicatorName = predicatorName; + this.compression = compression; + this.encoding = encoding; + this.blockSize = blockSize; + this.actualBlocks = actualBlocks; + this.meanCompressedSize = meanCompressedSize; + this.stddevCompressedSize = stddevCompressedSize; + this.minCompressedSize = minCompressedSize; + this.maxCompressedSize = maxCompressedSize; + this.meanDeviationPct = meanDeviationPct; + } + } + + List<PredicatorAccuracyResult> runPredicatorAccuracyTests(Configuration baseConf, FileSystem fs, + Path dir) throws IOException { + List<PredicatorAccuracyResult> results = new ArrayList<>(); + List<Integer> blockSizes = blockSizeSweep(); + + for (Compression.Algorithm compression : compressions) { + for (DataBlockEncoding encoding : encodings) { + for (int blockSize : blockSizes) { + for (Class<? extends BlockCompressedSizePredicator> predClass : PREDICATOR_CLASSES) { + String predName = predClass.getSimpleName(); + String label = String.format("%s-%s-%d-%s", compression.getName(), encoding.name(), + blockSize, predName); + + Configuration conf = new Configuration(baseConf); + conf.setClass(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, predClass, + BlockCompressedSizePredicator.class); + + LOG.info("Predicator accuracy test: compression={}, encoding={}, blockSize={}, " + + "predicator={}", compression.getName(), encoding.name(), blockSize, predName); + + long startTime = EnvironmentEdgeManager.currentTime(); + Path hfilePath = writeHFile(conf, fs, dir, compression, encoding, blockSize, label); + long writeElapsed = EnvironmentEdgeManager.currentTime() - startTime; + + List<Integer> compressedBlockSizes = new ArrayList<>(); + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + scanner.seekTo(); + HFileBlock prevBlock = null; + do { + Cell cell = scanner.getCell(); + if (cell == null) { + break; + } + // We iterate cells but track block transitions via the scanner's internal block + } while (scanner.next()); + } + + // Read block-level info by traversing the data index + int dataBlockCount = reader.getTrailer().getDataIndexCount(); + // Use the reader's block iterator to get on-disk sizes + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + if (scanner.seekTo()) { + HFileBlock block = null; + long offset = reader.getTrailer().getFirstDataBlockOffset(); + long lastOffset = -1; + // Read blocks directly + while (offset >= 0 && offset != lastOffset) { + lastOffset = offset; + try { + block = reader.readBlock(offset, -1, false, false, false, true, null, + reader.getDataBlockEncoding()); + if (block == null || !block.getBlockType().isData()) { + break; + } + compressedBlockSizes.add(block.getOnDiskSizeWithHeader()); + long nextOffset = offset + block.getOnDiskSizeWithHeader(); + block.release(); + block = null; + offset = nextOffset; + } catch (Exception e) { + break; + } + } + } + } + } + + // Compute stats + int actualBlocks = compressedBlockSizes.size(); + double mean = 0; + int min = Integer.MAX_VALUE; + int max = Integer.MIN_VALUE; + for (int s : compressedBlockSizes) { + mean += s; + min = Math.min(min, s); + max = Math.max(max, s); + } + if (actualBlocks > 0) { + mean /= actualBlocks; + } + double variance = 0; + for (int s : compressedBlockSizes) { + variance += (s - mean) * (s - mean); + } + double stddev = actualBlocks > 1 ? Math.sqrt(variance / (actualBlocks - 1)) : 0; + double meanDeviationPct = + blockSize > 0 ? Math.abs(mean - blockSize) / blockSize * 100.0 : 0; + + PredicatorAccuracyResult result = new PredicatorAccuracyResult(predName, + compression.getName(), encoding.name(), blockSize, actualBlocks, mean, stddev, + actualBlocks > 0 ? min : 0, actualBlocks > 0 ? max : 0, meanDeviationPct); + results.add(result); + + LOG.info( + " predicator={}: blocks={}, meanOnDisk={}, stddev={}, min={}, max={}, " + + "devFromTarget={}%, writeTime={}ms", + predName, actualBlocks, String.format("%.0f", mean), String.format("%.0f", stddev), + actualBlocks > 0 ? min : "N/A", actualBlocks > 0 ? max : "N/A", + String.format("%.1f", meanDeviationPct), writeElapsed); + + // Cleanup + fs.delete(hfilePath, false); + } + } + } + } + return results; + } + + // ---- Write throughput benchmark ---- + + static class ThroughputResult { + final String testName; + final String compression; + final String encoding; + final int blockSize; + final long elapsedMs; + final long totalBytes; + final double mbPerSec; + final int blockCount; + + ThroughputResult(String testName, String compression, String encoding, int blockSize, + long elapsedMs, long totalBytes, double mbPerSec, int blockCount) { + this.testName = testName; + this.compression = compression; + this.encoding = encoding; + this.blockSize = blockSize; + this.elapsedMs = elapsedMs; + this.totalBytes = totalBytes; + this.mbPerSec = mbPerSec; + this.blockCount = blockCount; + } + } + + List<ThroughputResult> runWriteBenchmarks(Configuration baseConf, FileSystem fs, Path dir) + throws IOException { + List<ThroughputResult> results = new ArrayList<>(); + List<Integer> blockSizes = blockSizeSweep(); + + for (Compression.Algorithm compression : compressions) { + for (DataBlockEncoding encoding : encodings) { + for (int blockSize : blockSizes) { + String label = + String.format("write-%s-%s-%d", compression.getName(), encoding.name(), blockSize); + + Configuration conf = new Configuration(baseConf); + // Use default predicator for throughput tests + conf.setClass(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, + UncompressedBlockSizePredicator.class, BlockCompressedSizePredicator.class); + + LOG.info("Write benchmark: compression={}, encoding={}, blockSize={}", + compression.getName(), encoding.name(), blockSize); + + long startTime = EnvironmentEdgeManager.currentTime(); + Path hfilePath = writeHFile(conf, fs, dir, compression, encoding, blockSize, label); + long elapsed = EnvironmentEdgeManager.currentTime() - startTime; + + long fileSize = fs.getFileStatus(hfilePath).getLen(); + int blockCount = 0; + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + blockCount = reader.getTrailer().getDataIndexCount(); + } + + double mbps = elapsed > 0 ? (fileSize / (1024.0 * 1024.0)) / (elapsed / 1000.0) : 0; + + ThroughputResult result = new ThroughputResult("SequentialWrite", compression.getName(), + encoding.name(), blockSize, elapsed, fileSize, mbps, blockCount); + results.add(result); + + LOG.info(" elapsed={}ms, fileSize={}, blocks={}, throughput={} MB/s", elapsed, fileSize, + blockCount, String.format("%.2f", mbps)); + + // Keep file for read benchmarks if needed, otherwise clean up + if (throughputOnly || !predicatorsOnly) { + // Will be read by read benchmarks; clean up after read + } else { + fs.delete(hfilePath, false); + } + } + } + } + return results; + } + + // ---- Read throughput benchmarks ---- + + List<ThroughputResult> runReadBenchmarks(Configuration baseConf, FileSystem fs, Path dir) + throws IOException { + List<ThroughputResult> results = new ArrayList<>(); + List<Integer> blockSizes = blockSizeSweep(); + + for (Compression.Algorithm compression : compressions) { + for (DataBlockEncoding encoding : encodings) { + for (int blockSize : blockSizes) { + String label = + String.format("write-%s-%s-%d", compression.getName(), encoding.name(), blockSize); + Path hfilePath = new Path(dir, "blockencpe-" + label + ".hfile"); + + if (!fs.exists(hfilePath)) { + // Write the file first if it doesn't exist + Configuration writeConf = new Configuration(baseConf); + writeConf.setClass(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, + UncompressedBlockSizePredicator.class, BlockCompressedSizePredicator.class); + hfilePath = writeHFile(writeConf, fs, dir, compression, encoding, blockSize, label); + } + + Configuration conf = new Configuration(baseConf); + + // Sequential read + LOG.info("Sequential read benchmark: compression={}, encoding={}, blockSize={}", + compression.getName(), encoding.name(), blockSize); + + long totalBytesRead = 0; + int cellCount = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + scanner.seekTo(); + do { + Cell cell = scanner.getCell(); + if (cell == null) { + break; + } + totalBytesRead += cell.getSerializedSize(); + cellCount++; + } while (scanner.next()); + } + } + long seqElapsed = EnvironmentEdgeManager.currentTime() - startTime; + + double seqMbps = + seqElapsed > 0 ? (totalBytesRead / (1024.0 * 1024.0)) / (seqElapsed / 1000.0) : 0; + + results.add(new ThroughputResult("SequentialRead", compression.getName(), encoding.name(), + blockSize, seqElapsed, totalBytesRead, seqMbps, cellCount)); + + LOG.info(" sequential: elapsed={}ms, cells={}, throughput={} MB/s", seqElapsed, + cellCount, String.format("%.2f", seqMbps)); + + // Random read + LOG.info("Random read benchmark: compression={}, encoding={}, blockSize={}", + compression.getName(), encoding.name(), blockSize); + + int randomReads = numBlocks; + startTime = EnvironmentEdgeManager.currentTime(); + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + long entryCount = reader.getEntries(); + for (int i = 0; i < randomReads; i++) { + long randomRow = ThreadLocalRandom.current().nextLong(entryCount); + byte[] rowKey = formatRowKey(randomRow); + ExtendedCell seekCell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(rowKey).setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("q")) + .setTimestamp(Long.MAX_VALUE).setType(KeyValue.Type.Maximum.getCode()) + .setValue(HConstants.EMPTY_BYTE_ARRAY).build(); + try (HFileScanner scanner = reader.getScanner(conf, false, true)) { + scanner.seekTo(seekCell); + Cell cell = scanner.getCell(); + if (cell != null) { + totalBytesRead += cell.getSerializedSize(); + } + } + } + } + long randElapsed = EnvironmentEdgeManager.currentTime() - startTime; + double opsPerSec = randElapsed > 0 ? (randomReads * 1000.0) / randElapsed : 0; + + results.add(new ThroughputResult("RandomRead", compression.getName(), encoding.name(), + blockSize, randElapsed, randomReads, opsPerSec, randomReads)); Review Comment: Random-read benchmark keeps accumulating into totalBytesRead (which already contains sequential-read bytes). Since the RandomRead result reports a count (randomReads) rather than bytes, this extra accumulation is misleading; either reset the counter before the random loop or use a separate randomBytesRead variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
