eab148 commented on code in PR #6327: URL: https://github.com/apache/hbase/pull/6327#discussion_r1887721740
########## hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example.row.stats; + +import java.util.Map; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.RawCellBuilder; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsUtil; +import org.apache.hadoop.hbase.regionserver.Shipper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.GsonUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.gson.Gson; +import org.apache.hbase.thirdparty.com.google.gson.JsonObject; + +/** + * Holder for accumulating row statistics in {@link RowStatisticsCompactionObserver} Creates various + * cell, row, and total stats. + */ +@InterfaceAudience.Private +public class RowStatisticsImpl implements RowStatistics { + + private static final Logger LOG = LoggerFactory.getLogger(RowStatisticsImpl.class); + private static final Gson GSON = GsonUtil.createGson().create(); + + // + // Transient fields which are not included in gson serialization + // + private final transient long blockSize; + private final transient long maxCacheSize; + private transient int rowCells; + private transient long rowBytes; + private transient byte[] largestRow; + private transient Cell largestCell; + private final transient boolean isMajor; + private final transient SizeBucketTracker rowSizeBuckets; + private final transient SizeBucketTracker valueSizeBuckets; + + // We don't need to clone anything until shipped() is called on scanner. + // To avoid allocations, we keep a reference until that point + private transient Cell largestRowRef; + private transient Cell largestCellRef; + // + // Non-transient fields which are included in gson + // + private final String table; + private final String region; + private final String columnFamily; + private long largestRowBytes; + private int largestRowCells; + private long largestCellBytes; + private int cellsLargerThanOneBlock; + private int rowsLargerThanOneBlock; + private int cellsLargerThanMaxCacheSize; + private int totalDeletes; + private int totalCells; + private int totalRows; + private long totalBytes; + + RowStatisticsImpl(String table, String encodedRegion, String columnFamily, long blockSize, + long maxCacheSize, boolean isMajor) { + this.table = table; + this.region = encodedRegion; + this.columnFamily = columnFamily; + this.blockSize = blockSize; + this.maxCacheSize = maxCacheSize; + this.isMajor = isMajor; + this.rowSizeBuckets = new SizeBucketTracker(); + this.valueSizeBuckets = new SizeBucketTracker(); + } + + public void handleRowChanged(Cell lastCell) { + if (rowBytes > largestRowBytes) { + largestRowRef = lastCell; + largestRowBytes = rowBytes; + largestRowCells = rowCells; + } + if (rowBytes > blockSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("RowTooLarge: rowBytes={}, blockSize={}, table={}, rowKey={}", rowBytes, + blockSize, table, Bytes.toStringBinary(lastCell.getRowArray(), lastCell.getRowOffset(), + lastCell.getRowLength())); + } + rowsLargerThanOneBlock++; + } + rowSizeBuckets.add(rowBytes); + rowBytes = 0; + rowCells = 0; + totalRows++; + } + + public void consumeCell(Cell cell) { + int cellSize = cell.getSerializedSize(); + + rowBytes += cellSize; + rowCells++; + + boolean tooLarge = false; + if (cellSize > maxCacheSize) { + cellsLargerThanMaxCacheSize++; + tooLarge = true; + } + if (cellSize > blockSize) { + cellsLargerThanOneBlock++; + tooLarge = true; + } + + if (tooLarge && LOG.isDebugEnabled()) { + LOG.debug("CellTooLarge: size={}, blockSize={}, maxCacheSize={}, table={}, cell={}", cellSize, + blockSize, maxCacheSize, table, CellUtil.toString(cell, false)); + } + + if (cellSize > largestCellBytes) { + largestCellRef = cell; + largestCellBytes = cellSize; + } + valueSizeBuckets.add(cell.getValueLength()); + + totalCells++; + if (CellUtil.isDelete(cell)) { + totalDeletes++; + } + totalBytes += cellSize; + } + + /** + * Clone the cell refs so they can be cleaned up by {@link Shipper#shipped()}. Doing this lazily Review Comment: We should be safe keeping shallow copies of these Cells until the `RowStatisticsScanner::shipped` method is called `Compactor::compact` method 1. Initializes the InternalScanner, wrapping it in our RowStatisticsScanner. The InternalScanner lets us through all of the compacting Cells. 2. Runs the compaction process via `Compactor::performCompaction`. The Compactor uses the `HFileWriterImpl writer` to build the new HFiles from the set of compacting Stores Specifically, the `Compactor::performCompaction` runs a loop that - Appends and processes a batch of Cells for the new HFile block - Ships the batch of Cells. This shipped signal indicates that the batch has been processed and that the Cells' associated resources in memory should be released. The shipped signal helps the system avoid holding on to large amounts of data in memory or leaking resources. - We can see preparation for this deallocation process in action in the `HFileWriter::beforeShipped` method (called by the `StoreFileWriter::beforeShipped` method). This method creates deep copies of all Cell references that it created while adding Cells to the new HFile block. - After the `StoreFileWriter::beforeShipped` method returns, the `RowStatisticsScanner::shipped` method is called. It prepares for the deallocation in `InternalScanner::shipped` by performing deep copies of all of the Cell references. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org