kadirozde commented on code in PR #1569: URL: https://github.com/apache/phoenix/pull/1569#discussion_r1156696647
########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java: ########## @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ScanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX; + +/** + * The store scanner that implements Phoenix TTL and Max Lookback. Phoenix overrides the + * HBase implementation of data retention policies which is built at the cell level, and implements + * its row level data retention within this store scanner. + */ +public class CompactionScanner implements InternalScanner { + private static final Logger LOGGER = LoggerFactory.getLogger(CompactionScanner.class); + public static final String SEPARATOR = ":"; + private final InternalScanner storeScanner; + private final Region region; + private final Store store; + private final Configuration config; + private final RegionCoprocessorEnvironment env; + private long maxLookbackWindowStart; + private long ttlWindowStart; + private long ttl; + private final long maxLookbackInMillis; + private int minVersion; + private int maxVersion; + private final boolean emptyCFStore; + private KeepDeletedCells keepDeletedCells; + private long compactionTime; + private final byte[] emptyCF; + private final byte[] emptyCQ; + private static Map<String, Long> maxLookbackMap = new ConcurrentHashMap<>(); + private PhoenixLevelRowCompactor phoenixLevelRowCompactor; + private HBaseLevelRowCompactor hBaseLevelRowCompactor; + + public CompactionScanner(RegionCoprocessorEnvironment env, + Store store, + InternalScanner storeScanner, + long maxLookbackInMillis, + byte[] emptyCF, + byte[] emptyCQ) { + this.storeScanner = storeScanner; + this.region = env.getRegion(); + this.store = store; + this.env = env; + this.emptyCF = emptyCF; + this.emptyCQ = emptyCQ; + this.config = env.getConfiguration(); + compactionTime = EnvironmentEdgeManager.currentTimeMillis(); + this.maxLookbackInMillis = maxLookbackInMillis; + String columnFamilyName = store.getColumnFamilyName(); + String tableName = region.getRegionInfo().getTable().getNameAsString(); + Long overriddenMaxLookback = + maxLookbackMap.remove(tableName + SEPARATOR + columnFamilyName); + this.maxLookbackWindowStart = compactionTime - (overriddenMaxLookback == null ? + maxLookbackInMillis : Math.max(maxLookbackInMillis, overriddenMaxLookback)) - 1; + ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); + ttl = cfd.getTimeToLive(); + this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - ttl * 1000; + ttl *= 1000; + this.maxLookbackWindowStart = Math.max(ttlWindowStart, maxLookbackWindowStart); + this.minVersion = cfd.getMinVersions(); + this.maxVersion = cfd.getMaxVersions(); + this.keepDeletedCells = cfd.getKeepDeletedCells(); + emptyCFStore = region.getTableDescriptor().getColumnFamilies().length == 1 || + columnFamilyName.equals(Bytes.toString(emptyCF)) || + columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX); + phoenixLevelRowCompactor = new PhoenixLevelRowCompactor(); + hBaseLevelRowCompactor = new HBaseLevelRowCompactor(); + } + + /** + * Any coprocessors within a JVM can extend the max lookback window for a column family + * by calling this static method. + */ + public static void overrideMaxLookback(String tableName, String columnFamilyName, + long maxLookbackInMillis) { + if (tableName == null || columnFamilyName == null) { + return; + } + Long old = maxLookbackMap.putIfAbsent(tableName + SEPARATOR + columnFamilyName, + maxLookbackInMillis); + if (old != null && old < maxLookbackInMillis) { + maxLookbackMap.put(tableName + SEPARATOR + columnFamilyName, maxLookbackInMillis); + } + } + + @Override + public boolean next(List<Cell> result) throws IOException { + boolean hasMore = storeScanner.next(result); + if (!result.isEmpty()) { + phoenixLevelRowCompactor.compact(result, false); + } + return hasMore; + } + + @Override + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return next(result); + } + + @Override + public void close() throws IOException { + storeScanner.close(); + } + + /** + * The context for a given row during compaction. A row may have multiple compaction row + * versions. CompactionScanner uses the same row context for these versions. + */ + static class RowContext { + Cell familyDeleteMarker = null; + Cell familyVersionDeleteMarker = null; + List<Cell> columnDeleteMarkers = null; + int version = 0; + long maxTimestamp; + long minTimestamp; + private void addColumnDeleteMarker(Cell deleteMarker) { + if (columnDeleteMarkers == null) { + columnDeleteMarkers = new ArrayList<>(); + } + columnDeleteMarkers.add(deleteMarker); + } + } + + /** + * This method finds out the maximum and minimum timestamp of the cells of the next row + * version. + * + * @param columns + * @param rowContext + */ + private void getNextRowVersionTimestamp(LinkedList<LinkedList<Cell>> columns, + RowContext rowContext) { + rowContext.maxTimestamp = 0; + rowContext.minTimestamp = Long.MAX_VALUE; + long ts; + long currentDeleteFamilyTimestamp = 0; + long nextDeleteFamilyTimestamp = 0; + boolean firstColumn = true; + for (LinkedList<Cell> column : columns) { + Cell firstCell = column.getFirst(); + ts = firstCell.getTimestamp(); + if (ts <= nextDeleteFamilyTimestamp) { + continue; + } + if (firstCell.getType() == Cell.Type.DeleteFamily || + firstCell.getType() == Cell.Type.DeleteFamilyVersion) { + if (firstColumn) { + // Family delete markers are always found in the first column of a column family + // When Phoenix deletes a row, it places a family delete marker in each column + // family with the same timestamp. We just need to process the delete column + // family markers of the first column family, which would be in the column + // with columnIndex=0. + currentDeleteFamilyTimestamp = firstCell.getTimestamp(); + // We need to check if the next delete family marker exits. If so, we need + // to record its timestamp as by definition a compaction row version cannot + // cross a family delete marker + if (column.size() > 1) { + nextDeleteFamilyTimestamp = column.get(1).getTimestamp(); + } else { + nextDeleteFamilyTimestamp = 0; + } + } + } else if (firstCell.getType() == Cell.Type.Put) { + // Row versions are constructed from put cells. So, we use only + // put cell timestamps to find the time range for a compaction row version + if (rowContext.maxTimestamp < ts) { + rowContext.maxTimestamp = ts; + } + if (currentDeleteFamilyTimestamp != 0 && currentDeleteFamilyTimestamp < rowContext.maxTimestamp) { + // A compaction row version do not cross a family delete marker. This means + // min timestamp cannot be lower than currentDeleteFamilyTimestamp + rowContext.minTimestamp = currentDeleteFamilyTimestamp + 1; + } else if (rowContext.minTimestamp > ts) { + rowContext.minTimestamp = ts; + } + } + firstColumn = false; + } + } + + /** + * HBaseLevelRowCompactor ensures that the cells of a given row are retained according to the + * HBase data retention rules. + * + */ + class HBaseLevelRowCompactor { + /** + * A compaction row version includes the latest put cell versions from each column such that + * the cell versions do not cross delete family markers. In other words, the compaction row + * versions are built from cell versions that are all either before or after the next delete + * family or delete family version maker if family delete markers exist. Also, when the cell + * timestamps are ordered for a given row version, the difference between two subsequent + * timestamps has to be less than the ttl value. This is taken care before calling + * HBaseLevelRowCompactor#compact(). + * + * Compaction row versions are disjoint sets. A compaction row version does not share a cell + * version with the next compaction row version. A compaction row version includes at most + * one cell version from a column. + * + * After creating the first compaction row version, we form the next compaction row version + * from the remaining cell versions. + * + * Compaction row versions are used for compaction purposes to efficiently determine which + * cell versions to retain based on the HBase data retention parameters. + */ + class CompactionRowVersion { + // Cells included in the row version + List<Cell> cells = new ArrayList<>(); + // The timestamp of the row version + long ts = 0; + // The version of a row version. It is the minimum of the versions of the cells included + // in the row version + int version = 0; + @Override + public String toString() { + StringBuilder output = new StringBuilder(); + output.append("Cell count: " + cells.size() + "\n"); + for (Cell cell : cells) { + output.append(cell + "\n"); + } + output.append("ts:" + ts + " v:" + version); + return output.toString(); + } + } + + /** + * Decide if compaction row versions inside the TTL window should be retained. The + * versions are retained if one of the following conditions holds + * 1. The compaction row version is alive and its version is less than VERSIONS + * 2. The compaction row version is deleted and KeepDeletedCells is TTL + * 3. The compaction row version is deleted, its version is less than MIN_VERSIONS and + * KeepDeletedCells is TRUE + * + */ + private boolean retainInsideTTLWindow(List<Cell> result, Review Comment: Good point! We do not need these methods to return a value anymore after restructuring the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
