[
https://issues.apache.org/jira/browse/PHOENIX-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17696375#comment-17696375
]
ASF GitHub Bot commented on PHOENIX-6888:
-----------------------------------------
kadirozde commented on code in PR #1569:
URL: https://github.com/apache/phoenix/pull/1569#discussion_r1125177794
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/StoreCompactionScanner.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The store scanner that implements Phoenix TTL and Max Lookback
+ */
+public class StoreCompactionScanner implements InternalScanner {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StoreCompactionScanner.class);
+ private final InternalScanner storeScanner;
+ private final Region region;
+ private final Store store;
+ private final Configuration config;
+ private final RegionCoprocessorEnvironment env;
+ private long maxLookbackWindowStart;
+ private long ttlWindowStart;
+ private int minVersion;
+ private int maxVersion;
+ private final boolean firstStore;
+ private KeepDeletedCells keepDeletedCells;
+ private long compactionTime;
+
+ public StoreCompactionScanner(RegionCoprocessorEnvironment env,
+ Store store,
+ InternalScanner storeScanner,
+ long maxLookbackInMs) {
+ this.storeScanner = storeScanner;
+ this.region = env.getRegion();
+ this.store = store;
+ this.env = env;
+ this.config = env.getConfiguration();
+ compactionTime = EnvironmentEdgeManager.currentTimeMillis();
+ this.maxLookbackWindowStart = compactionTime - maxLookbackInMs;
+ ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
+ long ttl = cfd.getTimeToLive();
+ this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime -
ttl * 1000;
+ this.minVersion = cfd.getMinVersions();
+ this.maxVersion = cfd.getMaxVersions();
+ this.keepDeletedCells = cfd.getKeepDeletedCells();
+ firstStore = region.getStores().get(0).getColumnFamilyName().
+ equals(store.getColumnFamilyName());
+ }
+
+ @Override
+ public boolean next(List<Cell> result) throws IOException {
+ synchronized (storeScanner) {
+ boolean hasMore = storeScanner.next(result);
+ filter(result, true);
+ Collections.sort(result, CellComparator.getInstance());
+ return hasMore;
+ }
+ }
+
+ @Override
+ public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
+ return next(result);
+ }
+
+ @Override
+ public void close() throws IOException {
+ storeScanner.close();
+ }
+ private void formColumns(List<Cell> result, List<List<Cell>> columns,
+ List<Cell> deleteMarkers) {
+ Cell currentColumnCell = null;
+ List<Cell> currentColumn = null;
+ for (Cell cell : result) {
+ if (cell.getType() != Cell.Type.Put) {
+ deleteMarkers.add(cell);
+ }
+ if (currentColumnCell == null) {
+ currentColumn = new ArrayList<>();
+ currentColumnCell = cell;
+ currentColumn.add(cell);
+ }
+ else if (Bytes.compareTo(cell.getQualifierArray(),
cell.getQualifierOffset(),
+ cell.getQualifierLength(),
+ currentColumnCell.getQualifierArray(),
currentColumnCell.getQualifierOffset(),
+ currentColumnCell.getQualifierLength()) != 0) {
+ columns.add(currentColumn);
+ currentColumn = new ArrayList<>();
+ currentColumnCell = cell;
+ currentColumn.add(cell);
+ }
+ else {
+ currentColumn.add(cell);
+ }
+ }
+ if (currentColumn != null) {
+ columns.add(currentColumn);
+ }
+ }
+
+ /**
+ * A row version that does not share a cell with any other row version is
called a
+ * compaction row version.
+ * The latest live or deleted row version at the compaction time
(compactionTime) is the first
+ * compaction row version. The next row version which does not share a
cell with the
+ * first compaction row version is the next compaction row version.
+ *
+ * The first compaction row version is a valid row version (i.e., a row
version at a given
+ * time). The subsequent compactions row versions may not represent a
valid row version if
+ * the rows are updated partially.
+ *
+ * Compaction row versions are used for compaction purposes to determine
which row versions to
+ * retain.
+ */
+ class CompactionRowVersion {
+ // Cells included in the row version
+ List<Cell> cells = new ArrayList<>();
+ // The delete marker deleting this row version
+ Cell deleteFamilyMarker = null;
+ // Delete or DeleteColumn markers deleting a cell of this version
+ List<Cell> columnDeleteMarkers = null;
+ // The timestamp of the row version
+ long ts = 0;
+ // The version of a row version. It is the minimum of the versions of
the cells included
+ // in the row version
+ int version = 0;
+ private void addColumnDeleteMarker(Cell deleteMarker) {
+ if (columnDeleteMarkers == null) {
+ columnDeleteMarkers = new ArrayList<>();
+ }
+ columnDeleteMarkers.add(deleteMarker);
+ }
+ }
+
+ private boolean isCellDeleted(List<Cell> deleteMarkers,
CompactionRowVersion rowVersion,
+ Cell cell, boolean storeLevel) {
+ int i = 0;
+ for (Cell dm : deleteMarkers) {
+ if ((storeLevel ||
+ Bytes.compareTo(cell.getFamilyArray(),
cell.getFamilyOffset(),
+ cell.getFamilyLength(),
+ dm.getFamilyArray(), dm.getFamilyOffset(),
dm.getFamilyLength()) == 0) &&
+ Bytes.compareTo(cell.getQualifierArray(),
cell.getQualifierOffset(),
+ cell.getQualifierLength(),
+ dm.getQualifierArray(), dm.getQualifierOffset(),
+ dm.getQualifierLength()) == 0) {
+ if (dm.getType() == Cell.Type.Delete) {
+ deleteMarkers.remove(i);
+ if (rowVersion.columnDeleteMarkers == null) {
+ rowVersion.columnDeleteMarkers = new ArrayList<>();
+ }
+ rowVersion.columnDeleteMarkers.add(dm);
+ }
+ return true;
+ }
+ i++;
+ }
+ return false;
+ }
+
+ private long getNextRowVersionTimestamp(List<List<Cell>> columns) {
+ long ts = 0;
+ for (List<Cell> column : columns) {
+ Cell firstCell = column.get(0);
+ if (firstCell.getType() == Cell.Type.DeleteFamily ||
+ firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
+ break;
+ }
+ if (firstCell.getType() == Cell.Type.DeleteColumn ||
+ firstCell.getType() == Cell.Type.Delete) {
+ continue;
+ }
+ if (ts < firstCell.getTimestamp()) {
+ ts = firstCell.getTimestamp();
+ }
+ }
+ return ts;
+ }
+
+ private boolean formRowVersions(List<List<Cell>> columns,
+ List<CompactionRowVersion> rowVersions,
+ boolean storeLevel) {
+ Cell lastDeleteFamilyMarker = null;
+ Cell lastDeleteFamilyVersionMarker = null;
+ List<Cell> columnDeleteMarkers = null;
+ int version = 0;
+ while (!columns.isEmpty()) {
+ long ts = getNextRowVersionTimestamp(columns);
+ CompactionRowVersion rowVersion = null;
+ // Form the next row version by picking the first cell from each
column if the cell
+ // is not masked by a delete marker
+ for (List<Cell> column : columns) {
+ Cell firstCell = column.remove(0);
+ if (firstCell.getType() == Cell.Type.DeleteFamily ||
+ firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
+ if (firstCell.getType() == Cell.Type.DeleteFamily) {
+ if (firstCell.getTimestamp() >= ttlWindowStart &&
Review Comment:
Yes, we need to preserve it. The decision is done later in prepareResults.
This is just an optimization to early terminate the row version generation
here. However, there is a typo in the if statement. It should read if
(firstCell.getTimestamp() < maxLookbackWindowStart &&. Good catch!
> Fixing TTL and Max Lookback Issues for Phoenix Tables
> -----------------------------------------------------
>
> Key: PHOENIX-6888
> URL: https://issues.apache.org/jira/browse/PHOENIX-6888
> Project: Phoenix
> Issue Type: Bug
> Affects Versions: 5.1.3
> Reporter: Kadir Ozdemir
> Assignee: Kadir Ozdemir
> Priority: Major
>
> In HBase, the unit of data is a cell and data retention rules are executed at
> the cell level. These rules are defined at the column family level. Phoenix
> leverages the data retention features of HBase and exposes them to its users
> to provide its TTL feature at the table level. However, these rules (since
> they are defined at the cell level instead of the row level) results in
> partial row retention that in turn creates data integrity issues at the
> Phoenix level.
> Similarly, Phoenix’s max lookback feature leverages HBase deleted data
> retention capabilities to preserve deleted cells within a configurable max
> lookback. This requires two data retention windows, max lookback and TTL. One
> end of these windows is the current time and the end is a moment in the past
> (i.e., current time minus the window size). Typically, the max lookback
> window is shorter than the TTL window. In the max lookback window, we would
> like to preserve the complete history of mutations regardless of how many
> cell versions these mutations generated. In the remaining TTL window outside
> the max lookback, we would like to apply the data retention rules defined
> above. However, HBase provides only one data retention window. Thus, the max
> lookback window had to be extended to become TTL window and the max lookback
> feature results in unwantedly retaining deleted data for the maximum of max
> lookback and TTL periods.
> This Jira is to fix both of these issues.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)