This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new b1c8e3f739 PHOENIX-7574 Phoenix Compaction doesn't correctly handle DeleteFamily… (#2105) b1c8e3f739 is described below commit b1c8e3f73973428180d56d2eb75ae37366a3c954 Author: Kadir Ozdemir <37155482+kadiro...@users.noreply.github.com> AuthorDate: Mon Apr 14 09:48:58 2025 -0700 PHOENIX-7574 Phoenix Compaction doesn't correctly handle DeleteFamily… (#2105) --- .../phoenix/coprocessor/CompactionScanner.java | 274 ++++++++++++++++++--- .../org/apache/phoenix/end2end/TableTTLIT.java | 94 ++++++- 2 files changed, 323 insertions(+), 45 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index eb935db022..08302d220d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -2213,6 +2213,7 @@ public class CompactionScanner implements InternalScanner { List<Cell> phoenixResult = new ArrayList<>(); List<Cell> trimmedRow = new ArrayList<>(); List<Cell> trimmedEmptyColumn = new ArrayList<>(); + List<Cell> deleteFamilyVersionCellList = new ArrayList<>(); private TTLTracker rowTracker; PhoenixLevelRowCompactor(TTLTracker rowTracker) { @@ -2220,49 +2221,248 @@ public class CompactionScanner implements InternalScanner { } /** + * Skip the remaining cells of the current column and only retain delete markers if the + * compaction is not a major compaction + * @param result The list of cells (input) + * @param currentColumnCell The cell indicates the current column (input) + * @param retainedCells The list of cells to be retained (output) + * @param index The index of the current cell in result (input) + * @return the index of the last skipped cell or the unchanged inputted index value if no + * cell to skip + */ + private int skipColumn(List<Cell> result, Cell currentColumnCell, List<Cell> retainedCells, + int index) { + for (int i = index + 1; i < result.size(); i++) { + Cell cell = result.get(i); + if (CellUtil.matchingColumn(cell, currentColumnCell)) { + index++; + if (cell.getType() != Cell.Type.Put && !major) { + retainedCells.add(cell); + } + } else { + return index; + } + } + return index; + } + + /** + * Add all empty column cells starting at index to the list emptyColumn + * @param result The list of cells (input) + * @param currentColumnCell The cell indicates the current column (input) + * @param index The index of the current cell in result (input) + * @param emptyColumn The list of empty column cells (output) + * @return the index of the last empty column cell or the unchanged inputted index value + * if no empty column cells to add + */ + private int addEmptyColumn(List<Cell> result, Cell currentColumnCell, int index, + List<Cell> emptyColumn) { + for (int i = index + 1; i < result.size(); i++) { + Cell cell = result.get(i); + if (CellUtil.matchingColumn(cell, currentColumnCell)) { + index++; + emptyColumn.add(cell); + } else { + return index; + } + } + return index; + } + /** + * This method retains all the cells within the max lookback window and the last row version + * visible at the lower edge of the max lookback window. The last row version can have zero + * or more cells at the lower edge of the window and/or zero or more cells outside the + * window. It also retains all delete markers outside the window if the compaction is not a + * major compaction, and returns the remaining cells (outside the max lookback window) of + * the empty colum. + * * The cells of the row (i.e., result) read from HBase store are lexicographically ordered * for tables using the key part of the cells which includes row, family, qualifier, * timestamp and type. The cells belong of a column are ordered from the latest to * the oldest. The method leverages this ordering and groups the cells into their columns * based on the pair of family name and column qualifier. - * - * The cells within the max lookback window except the once at the lower edge of the - * max lookback window (the last row of the max lookback window) are retained immediately. - * - * This method also returned the remaining cells (outside the max lookback window) of - * the empty colum */ private void getLastRowVersionInMaxLookbackWindow(List<Cell> result, List<Cell> lastRowVersion, List<Cell> retainedCells, List<Cell> emptyColumn) { + long maxLookbackWindowStart = rowContext.getMaxLookbackWindowStart(); Cell currentColumnCell = null; - boolean isEmptyColumn = false; - for (Cell cell : result) { - long maxLookbackWindowStart = rowContext.getMaxLookbackWindowStart(); + Cell deleteFamilyCell = null; + deleteFamilyVersionCellList.clear(); + top: + for (int index = 0; index < result.size(); index++) { + Cell cell = result.get(index); if (cell.getTimestamp() > maxLookbackWindowStart) { + // All cells within the max lookback window are retained. Here we retain all + // except the ones at the lower edge of the window. Those will be included in + // the last row version in the rest of the body of the loop retainedCells.add(cell); continue; } - if (!major && cell.getType() != Cell.Type.Put) { - retainedCells.add(cell); - } - if (currentColumnCell == null || - !CellUtil.matchingColumn(cell, currentColumnCell)) { + // The following section of the for loop processes an entire column in each + // iteration, that is, all cell versions for a given column will be processed in + // each iteration. Please note delete family markers (DeleteFamily and + // DeleteFamilyVersion) has their own column with the null column qualifier. The + // delete family column cells always forms the first column in a row of cells for a + // given column family + if (currentColumnCell == null) { + // This is the first column. If the row has any delete family markers, they will + // be in the first column currentColumnCell = cell; - isEmptyColumn = ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ); - if ((cell.getType() != Cell.Type.Delete - && cell.getType() != Cell.Type.DeleteColumn) - || cell.getTimestamp() == maxLookbackWindowStart) { - // Include only delete family markers and put cells - // The last row version can also be the cells with timestamp - // same as timestamp of start of max lookback window + if (cell.getType() == Cell.Type.DeleteFamily) { + // The first delete family marker at the edge or outside the max lookback + // window is DeleteFamily. + deleteFamilyCell = cell; + if (cell.getTimestamp() == maxLookbackWindowStart) { + // it is at the edge of the window.So we need to include it in the last + // row version + lastRowVersion.add(cell); + } else if (!major) { + // Retain the delete markers if the compaction is not major + retainedCells.add(cell); + } + // Skip the rest of the delete family cells + index = skipColumn(result, currentColumnCell, retainedCells, index); + continue; + } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) { + deleteFamilyVersionCellList.add(cell); + if (cell.getTimestamp() == maxLookbackWindowStart) { + lastRowVersion.add(cell); + } else if (!major) { + // Retain the delete markers if the compaction is not major + retainedCells.add(cell); + } + // Each DeleteFamilyVersion can delete at most one row version. There can be + // multiple of them, and we need to process each separately, and thus we + // need to track them in a list + for (int i = index + 1; i < result.size(); i++) { + cell = result.get(i); + if (cell.getType() == Cell.Type.DeleteFamilyVersion) { + index++; + deleteFamilyVersionCellList.add(cell); + if (!major) { + // Delete markers are retained if the compaction is not a major + // compaction + retainedCells.add(cell); + } + } else if (cell.getType() == Cell.Type.DeleteFamily) { + // After one or more DeleteFamilyVersion markers, there is a + // DeleteFamily marker. This marker deletes the rest of the cells + // and thus no need to process further delete family markers. Thus, + // we skip them using skipColumn + index++; + deleteFamilyCell = cell; + if (!major) { + retainedCells.add(cell); + } + // Skip the rest of the delete family cells + index = skipColumn(result, currentColumnCell, retainedCells, index); + continue top; + } else { + // Column changed as the current cell is not a delete family cell. + // Go back to the beginning of the for loop + continue top; + } + } + // All the cells in a row are processed + break top; + } + } + // All delete family markers are scanned and recorded above if there was any. Please + // note when we do region level compaction, each column family will have their owm + // delete family markers. Phoenix inserts the same set of delete markers to each + // column family. So, we need to keep track of the delete family markers of the + // first column family but apply these delete markers to all column families + currentColumnCell = cell; + // Is this cell masked by a delete column family version + if (!deleteFamilyVersionCellList.isEmpty()) { + // There could be back to back delete family version markers and thus we need a + // loop to check it + for (Cell deleteFamilyVersionCell : deleteFamilyVersionCellList) { + if (cell.getTimestamp() > deleteFamilyVersionCell.getTimestamp()) { + break; + } + if (cell.getTimestamp() == deleteFamilyVersionCell.getTimestamp()) { + // It is masked + if (cell.getType() != Cell.Type.Put) { + if (cell.getTimestamp() == maxLookbackWindowStart) { + lastRowVersion.add(cell); + } else if (!major) { + // Retain the delete markers if the compaction is not major + retainedCells.add(cell); + } + } + cell = result.get(index + 1); + if (!CellUtil.matchingColumn(cell, currentColumnCell)) { + continue top; + } + index++; + } + } + } + if (deleteFamilyCell != null + && deleteFamilyCell.getTimestamp() >= cell.getTimestamp()) { + // This column is deleted by a delete family marker. Skip this column + if (cell.getType() != Cell.Type.Put) { + if (cell.getTimestamp() == maxLookbackWindowStart) { + lastRowVersion.add(cell); + } else if (!major) { + // Retain the delete markers if the compaction is not major + retainedCells.add(cell); + } + } + index = skipColumn(result, currentColumnCell, retainedCells, index); + continue top; + } + // Process back-to-back deleted cell versions. Phoenix currently does not use delete + // cell version markers. This processing should not happen and is added for + // completeness + while (cell.getType() == Cell.Type.Delete) { + if (cell.getTimestamp() == maxLookbackWindowStart) { lastRowVersion.add(cell); + } else if (!major) { + retainedCells.add(cell); + } + Cell nextCell = result.get(index + 1); + if (!CellUtil.matchingColumn(currentColumnCell, nextCell)) { + continue top; + } + // Increment index by one as the delete cell should be consumed + index++; + if (nextCell.getType() == Cell.Type.Put + && cell.getTimestamp() == nextCell.getTimestamp()) { + // This put cell is masked by the delete marker + index++; + cell = result.get(index); } - } else if (isEmptyColumn) { - // We only need to keep one cell for every column for the last row version. - // So here we just form the empty column beyond the last row version. - // Empty column needs to be collected during minor compactions also - // else we will see partial row expiry. - emptyColumn.add(cell); + } + if (cell.getType() == Cell.Type.DeleteColumn) { + // The rest of the column is masked by this delete column cell + if (cell.getTimestamp() == maxLookbackWindowStart) { + lastRowVersion.add(cell); + } else if (!major) { + retainedCells.add(cell); + } + index = skipColumn(result, currentColumnCell, retainedCells, index); + continue top; + } + if (cell.getType() == Cell.Type.Put) { + lastRowVersion.add(cell); + if (ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) { + index = addEmptyColumn(result, currentColumnCell, index, emptyColumn); + } else { + index = skipColumn(result, currentColumnCell, retainedCells, index); + } + continue top; + } + // We can visit another delete family column for another column family if we are + // doing region level compaction. In that case, we should also retain delete family + // markers from that column family here. So we need to check if the cell type is + // DeleteFamily or DeleteFamilyVersion, the column family is the store under + // compaction and the compaction is not a major compaction. + if (!major && CellUtil.matchingFamily(cell, storeColumnFamily) && + (cell.getType() == Cell.Type.DeleteFamily + || cell.getType() == Cell.Type.DeleteFamilyVersion)) { + index = skipColumn(result, currentColumnCell, retainedCells, index); } } } @@ -2349,13 +2549,19 @@ public class CompactionScanner implements InternalScanner { long ttl = rowContext.getTTL(); rowContext.getNextRowVersionTimestamps(lastRow, storeColumnFamily); Cell firstCell = lastRow.get(0); - if (firstCell.getType() == Cell.Type.DeleteFamily || - firstCell.getType() == Cell.Type.DeleteFamilyVersion) { - if (firstCell.getTimestamp() >= rowContext.maxTimestamp) { - // This means that the row version outside the max lookback window is - // deleted and thus should not be visible to the scn queries - return; - } + while (firstCell.getType() == Cell.Type.DeleteFamilyVersion + && firstCell.getTimestamp() == rowContext.maxTimestamp) { + // This means that the row version outside the max lookback window is + // deleted and thus should not be visible to the scn queries + rowContext.getNextRowVersionTimestamps(lastRow, storeColumnFamily); + } + if ((firstCell.getType() == Cell.Type.DeleteFamily + && firstCell.getTimestamp() >= rowContext.maxTimestamp) + || (firstCell.getType() == Cell.Type.DeleteFamilyVersion + && firstCell.getTimestamp() == rowContext.maxTimestamp)) { + // This means that the row version outside the max lookback window is + // deleted and thus should not be visible to the scn queries + return; } if (major && compactionTime - rowContext.maxTimestamp > maxLookbackInMillis + ttl) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java index d23bb707aa..8c459d54b5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java @@ -22,14 +22,12 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.phoenix.util.ManualEnvironmentEdge; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.TestUtil; +import org.apache.phoenix.util.*; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -52,12 +50,7 @@ import java.util.Map; import java.util.Properties; import java.util.Random; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; @Category(NeedsOwnMiniClusterTest.class) @RunWith(Parameterized.class) @@ -463,6 +456,85 @@ public class TableTTLIT extends BaseTest { } } + @Test + public void testDeleteFamilyVersion() throws Exception { + // for the purpose of this test only considering cases when maxlookback is 0 + if (tableLevelMaxLooback == null || tableLevelMaxLooback != 0) { + return; + } + if (multiCF == true) { + return; + } + try (Connection conn = DriverManager.getConnection(getUrl())) { + String tableName = "T_" + generateUniqueName(); + createTable(tableName); + String indexName = "I_" + generateUniqueName(); + String indexDDL = String.format("create index %s on %s (val1) include (val2, val3)", + indexName, tableName); + conn.createStatement().execute(indexDDL); + updateRow(conn, tableName, "a1"); + String indexColumnValue; + String expectedValue; + String dql = "select val1, val2 from " + tableName + " where id = 'a1'"; + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertFalse(explainPlan.contains(indexName)); + assertTrue(rs.next()); + indexColumnValue = rs.getString(1); + expectedValue = rs.getString(2); + assertFalse(rs.next()); + } + // Insert an orphan index row by failing data table update + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + try { + updateColumn(conn, tableName, "a1", 2, "col2_xyz"); + conn.commit(); + fail("An exception should have been thrown"); + } catch (Exception ignored) { + // Ignore the exception + } finally { + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + } + // Insert another orphan index row + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + try { + updateColumn(conn, tableName, "a1", 2, "col2_abc"); + conn.commit(); + fail("An exception should have been thrown"); + } catch (Exception ignored) { + // Ignore the exception + } finally { + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + } + TestUtil.dumpTable(conn, TableName.valueOf(indexName)); + // do a read on the index which should trigger a read repair + dql = "select val2 from " + tableName + " where val1 = '" + indexColumnValue + "'"; + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertTrue(explainPlan.contains(indexName)); + assertTrue(rs.next()); + assertEquals(rs.getString(1), expectedValue); + assertFalse(rs.next()); + } + TestUtil.dumpTable(conn, TableName.valueOf(indexName)); + flush(TableName.valueOf(indexName)); + majorCompact(TableName.valueOf(indexName)); + TestUtil.dumpTable(conn, TableName.valueOf(indexName)); + // run the same query again after compaction + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertTrue(explainPlan.contains(indexName)); + assertTrue(rs.next()); + assertEquals(rs.getString(1), expectedValue); + assertFalse(rs.next()); + } + } + } + + private void flush(TableName table) throws IOException { Admin admin = getUtility().getAdmin(); admin.flush(table);