This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new b1c8e3f739 PHOENIX-7574 Phoenix Compaction doesn't correctly handle 
DeleteFamily… (#2105)
b1c8e3f739 is described below

commit b1c8e3f73973428180d56d2eb75ae37366a3c954
Author: Kadir Ozdemir <37155482+kadiro...@users.noreply.github.com>
AuthorDate: Mon Apr 14 09:48:58 2025 -0700

    PHOENIX-7574 Phoenix Compaction doesn't correctly handle DeleteFamily… 
(#2105)
---
 .../phoenix/coprocessor/CompactionScanner.java     | 274 ++++++++++++++++++---
 .../org/apache/phoenix/end2end/TableTTLIT.java     |  94 ++++++-
 2 files changed, 323 insertions(+), 45 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index eb935db022..08302d220d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -2213,6 +2213,7 @@ public class CompactionScanner implements InternalScanner 
{
         List<Cell> phoenixResult = new ArrayList<>();
         List<Cell> trimmedRow = new ArrayList<>();
         List<Cell> trimmedEmptyColumn = new ArrayList<>();
+        List<Cell> deleteFamilyVersionCellList = new ArrayList<>();
         private TTLTracker rowTracker;
 
         PhoenixLevelRowCompactor(TTLTracker rowTracker) {
@@ -2220,49 +2221,248 @@ public class CompactionScanner implements 
InternalScanner {
         }
 
         /**
+         * Skip the remaining cells of the current column and only retain 
delete markers if the
+         * compaction is not a major compaction
+         * @param result The list of cells (input)
+         * @param currentColumnCell The cell indicates the current column 
(input)
+         * @param retainedCells The list of cells to be retained (output)
+         * @param index The index of the current cell in result (input)
+         * @return the index of the last skipped cell or the unchanged 
inputted index value if no
+         *          cell to skip
+         */
+        private int skipColumn(List<Cell> result, Cell currentColumnCell, 
List<Cell> retainedCells,
+                               int index) {
+            for (int i = index + 1; i < result.size(); i++) {
+                Cell cell = result.get(i);
+                if (CellUtil.matchingColumn(cell, currentColumnCell)) {
+                    index++;
+                    if (cell.getType() != Cell.Type.Put && !major) {
+                        retainedCells.add(cell);
+                    }
+                } else {
+                    return index;
+                }
+            }
+            return index;
+        }
+
+        /**
+         * Add all empty column cells starting at index to the list emptyColumn
+         * @param result The list of cells (input)
+         * @param currentColumnCell The cell indicates the current column 
(input)
+         * @param index The index of the current cell in result (input)
+         * @param emptyColumn The list of empty column cells (output)
+         * @return the index of the last empty column cell or the unchanged 
inputted index value
+         *          if no empty column cells to add
+         */
+        private int addEmptyColumn(List<Cell> result, Cell currentColumnCell, 
int index,
+                                   List<Cell> emptyColumn) {
+            for (int i = index + 1; i < result.size(); i++) {
+                Cell cell = result.get(i);
+                if (CellUtil.matchingColumn(cell, currentColumnCell)) {
+                    index++;
+                    emptyColumn.add(cell);
+                } else {
+                    return index;
+                }
+            }
+            return index;
+        }
+        /**
+         * This method retains all the cells within the max lookback window 
and the last row version
+         * visible at the lower edge of the max lookback window. The last row 
version can have zero
+         * or more cells at the lower edge of the window and/or zero or more 
cells outside the
+         * window. It also retains all delete markers outside the window if 
the compaction is not a
+         * major compaction, and returns the remaining cells (outside the max 
lookback window) of
+         * the empty colum.
+         *
          * The cells of the row (i.e., result) read from HBase store are 
lexicographically ordered
          * for tables using the key part of the cells which includes row, 
family, qualifier,
          * timestamp and type. The cells belong of a column are ordered from 
the latest to
          * the oldest. The method leverages this ordering and groups the cells 
into their columns
          * based on the pair of family name and column qualifier.
-         *
-         * The cells within the max lookback window except the once at the 
lower edge of the
-         * max lookback window (the last row of the max lookback window) are 
retained immediately.
-         *
-         * This method also returned the remaining cells (outside the max 
lookback window) of
-         * the empty colum
          */
         private void getLastRowVersionInMaxLookbackWindow(List<Cell> result,
                 List<Cell> lastRowVersion, List<Cell> retainedCells, 
List<Cell> emptyColumn) {
+            long maxLookbackWindowStart = 
rowContext.getMaxLookbackWindowStart();
             Cell currentColumnCell = null;
-            boolean isEmptyColumn = false;
-            for (Cell cell : result) {
-                long maxLookbackWindowStart = 
rowContext.getMaxLookbackWindowStart();
+            Cell deleteFamilyCell = null;
+            deleteFamilyVersionCellList.clear();
+            top:
+            for (int index = 0; index < result.size(); index++) {
+                Cell cell = result.get(index);
                 if (cell.getTimestamp() > maxLookbackWindowStart) {
+                    // All cells within the max lookback window are retained. 
Here we retain all
+                    // except the ones at the lower edge of the window. Those 
will be included in
+                    // the last row version in the rest of the body of the loop
                     retainedCells.add(cell);
                     continue;
                 }
-                if (!major && cell.getType() != Cell.Type.Put) {
-                    retainedCells.add(cell);
-                }
-                if (currentColumnCell == null ||
-                        !CellUtil.matchingColumn(cell, currentColumnCell)) {
+                // The following section of the for loop processes an entire 
column in each
+                // iteration, that is, all cell versions for a given column 
will be processed in
+                // each iteration. Please note delete family markers 
(DeleteFamily and
+                // DeleteFamilyVersion) has their own column with the null 
column qualifier. The
+                // delete family column cells always forms the first column in 
a row of cells for a
+                // given column family
+                if (currentColumnCell == null) {
+                    // This is the first column. If the row has any delete 
family markers, they will
+                    // be in the first column
                     currentColumnCell = cell;
-                    isEmptyColumn = ScanUtil.isEmptyColumn(cell, emptyCF, 
emptyCQ);
-                    if ((cell.getType() != Cell.Type.Delete
-                            && cell.getType() != Cell.Type.DeleteColumn)
-                            || cell.getTimestamp() == maxLookbackWindowStart) {
-                        // Include only delete family markers and put cells
-                        // The last row version can also be the cells with 
timestamp
-                        // same as timestamp of start of max lookback window
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        // The first delete family marker at the edge or 
outside the max lookback
+                        // window is DeleteFamily.
+                        deleteFamilyCell = cell;
+                        if (cell.getTimestamp() == maxLookbackWindowStart) {
+                            // it is at the edge of the window.So we need to 
include it in the last
+                            // row version
+                            lastRowVersion.add(cell);
+                        } else if (!major) {
+                            // Retain the delete markers if the compaction is 
not major
+                            retainedCells.add(cell);
+                        }
+                        // Skip the rest of the delete family cells
+                        index = skipColumn(result, currentColumnCell, 
retainedCells, index);
+                        continue;
+                    } else if (cell.getType() == 
Cell.Type.DeleteFamilyVersion) {
+                        deleteFamilyVersionCellList.add(cell);
+                        if (cell.getTimestamp() == maxLookbackWindowStart) {
+                            lastRowVersion.add(cell);
+                        } else if (!major) {
+                            // Retain the delete markers if the compaction is 
not major
+                            retainedCells.add(cell);
+                        }
+                        // Each DeleteFamilyVersion can delete at most one row 
version. There can be
+                        // multiple of them, and we need to process each 
separately, and thus we
+                        // need to track them in a list
+                        for (int i = index + 1; i < result.size(); i++) {
+                            cell = result.get(i);
+                            if (cell.getType() == 
Cell.Type.DeleteFamilyVersion) {
+                                index++;
+                                deleteFamilyVersionCellList.add(cell);
+                                if (!major) {
+                                    // Delete markers are retained if the 
compaction is not a major
+                                    // compaction
+                                    retainedCells.add(cell);
+                                }
+                            } else if (cell.getType() == 
Cell.Type.DeleteFamily) {
+                                // After one or more DeleteFamilyVersion 
markers, there is a
+                                // DeleteFamily marker. This marker deletes 
the rest of the cells
+                                // and thus no need to process further delete 
family markers. Thus,
+                                // we skip them using skipColumn
+                                index++;
+                                deleteFamilyCell = cell;
+                                if (!major) {
+                                    retainedCells.add(cell);
+                                }
+                                // Skip the rest of the delete family cells
+                                index = skipColumn(result, currentColumnCell, 
retainedCells, index);
+                                continue top;
+                            } else {
+                                // Column changed as the current cell is not a 
delete family cell.
+                                // Go back to the beginning of the for loop
+                                continue top;
+                            }
+                        }
+                        // All the cells in a row are processed
+                        break top;
+                    }
+                }
+                // All delete family markers are scanned and recorded above if 
there was any. Please
+                // note when we do region level compaction, each column family 
will have their owm
+                // delete family markers. Phoenix inserts the same set of 
delete markers to each
+                // column family. So, we need to keep track of the delete 
family markers of the
+                // first column family but apply these delete markers to all 
column families
+                currentColumnCell = cell;
+                // Is this cell masked by a delete column family version
+                if (!deleteFamilyVersionCellList.isEmpty()) {
+                    // There could be back to back delete family version 
markers and thus we need a
+                    // loop to check it
+                    for (Cell deleteFamilyVersionCell : 
deleteFamilyVersionCellList) {
+                        if (cell.getTimestamp() > 
deleteFamilyVersionCell.getTimestamp()) {
+                            break;
+                        }
+                        if (cell.getTimestamp() == 
deleteFamilyVersionCell.getTimestamp()) {
+                            // It is masked
+                            if (cell.getType() != Cell.Type.Put) {
+                                if (cell.getTimestamp() == 
maxLookbackWindowStart) {
+                                    lastRowVersion.add(cell);
+                                } else if (!major) {
+                                    // Retain the delete markers if the 
compaction is not major
+                                    retainedCells.add(cell);
+                                }
+                            }
+                            cell = result.get(index + 1);
+                            if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
+                                continue top;
+                            }
+                            index++;
+                        }
+                    }
+                }
+                if (deleteFamilyCell != null
+                        && deleteFamilyCell.getTimestamp() >= 
cell.getTimestamp()) {
+                    // This column is deleted by a delete family marker. Skip 
this column
+                    if (cell.getType() != Cell.Type.Put) {
+                        if (cell.getTimestamp() == maxLookbackWindowStart) {
+                            lastRowVersion.add(cell);
+                        } else if (!major) {
+                            // Retain the delete markers if the compaction is 
not major
+                            retainedCells.add(cell);
+                        }
+                    }
+                    index = skipColumn(result, currentColumnCell, 
retainedCells, index);
+                    continue top;
+                }
+                // Process back-to-back deleted cell versions. Phoenix 
currently does not use delete
+                // cell version markers. This processing should not happen and 
is added for
+                // completeness
+                while (cell.getType() == Cell.Type.Delete) {
+                    if (cell.getTimestamp() == maxLookbackWindowStart) {
                         lastRowVersion.add(cell);
+                    } else if (!major) {
+                        retainedCells.add(cell);
+                    }
+                    Cell nextCell = result.get(index + 1);
+                    if (!CellUtil.matchingColumn(currentColumnCell, nextCell)) 
{
+                        continue top;
+                    }
+                    // Increment index by one as the delete cell should be 
consumed
+                    index++;
+                    if (nextCell.getType() == Cell.Type.Put
+                            && cell.getTimestamp() == nextCell.getTimestamp()) 
{
+                        // This put cell is masked by the delete marker
+                        index++;
+                        cell = result.get(index);
                     }
-                } else if (isEmptyColumn) {
-                    // We only need to keep one cell for every column for the 
last row version.
-                    // So here we just form the empty column beyond the last 
row version.
-                    // Empty column needs to be collected during minor 
compactions also
-                    // else we will see partial row expiry.
-                    emptyColumn.add(cell);
+                }
+                if (cell.getType() == Cell.Type.DeleteColumn) {
+                    // The rest of the column is masked by this delete column 
cell
+                    if (cell.getTimestamp() == maxLookbackWindowStart) {
+                        lastRowVersion.add(cell);
+                    } else if (!major) {
+                        retainedCells.add(cell);
+                    }
+                    index = skipColumn(result, currentColumnCell, 
retainedCells, index);
+                    continue top;
+                }
+                if (cell.getType() == Cell.Type.Put) {
+                    lastRowVersion.add(cell);
+                    if (ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) {
+                        index = addEmptyColumn(result, currentColumnCell, 
index, emptyColumn);
+                    } else {
+                        index = skipColumn(result, currentColumnCell, 
retainedCells, index);
+                    }
+                    continue top;
+                }
+                // We can visit another delete family column for another 
column family if we are
+                // doing region level compaction. In that case, we should also 
retain delete family
+                // markers from that column family here. So we need to check 
if the cell type is
+                // DeleteFamily or DeleteFamilyVersion, the column family is 
the store under
+                // compaction and the compaction is not a major compaction.
+                if (!major && CellUtil.matchingFamily(cell, storeColumnFamily) 
&&
+                        (cell.getType() == Cell.Type.DeleteFamily
+                                || cell.getType() == 
Cell.Type.DeleteFamilyVersion)) {
+                    index = skipColumn(result, currentColumnCell, 
retainedCells, index);
                 }
             }
         }
@@ -2349,13 +2549,19 @@ public class CompactionScanner implements 
InternalScanner {
             long ttl = rowContext.getTTL();
             rowContext.getNextRowVersionTimestamps(lastRow, storeColumnFamily);
             Cell firstCell = lastRow.get(0);
-            if (firstCell.getType() == Cell.Type.DeleteFamily ||
-                    firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
-                if (firstCell.getTimestamp() >= rowContext.maxTimestamp) {
-                    // This means that the row version outside the max 
lookback window is
-                    // deleted and thus should not be visible to the scn 
queries
-                    return;
-                }
+            while (firstCell.getType() == Cell.Type.DeleteFamilyVersion
+                    && firstCell.getTimestamp() == rowContext.maxTimestamp) {
+                // This means that the row version outside the max lookback 
window is
+                // deleted and thus should not be visible to the scn queries
+                rowContext.getNextRowVersionTimestamps(lastRow, 
storeColumnFamily);
+            }
+            if ((firstCell.getType() == Cell.Type.DeleteFamily
+                    && firstCell.getTimestamp() >= rowContext.maxTimestamp)
+                    || (firstCell.getType() == Cell.Type.DeleteFamilyVersion
+                    && firstCell.getTimestamp() == rowContext.maxTimestamp)) {
+                // This means that the row version outside the max lookback 
window is
+                // deleted and thus should not be visible to the scn queries
+                return;
             }
 
             if (major && compactionTime - rowContext.maxTimestamp > 
maxLookbackInMillis + ttl) {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index d23bb707aa..8c459d54b5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -22,14 +22,12 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.ManualEnvironmentEdge;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.*;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -52,12 +50,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
@@ -463,6 +456,85 @@ public class TableTTLIT extends BaseTest {
         }
     }
 
+    @Test
+    public void testDeleteFamilyVersion() throws Exception {
+        // for the purpose of this test only considering cases when 
maxlookback is 0
+        if (tableLevelMaxLooback == null || tableLevelMaxLooback != 0) {
+            return;
+        }
+        if (multiCF == true) {
+            return;
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = "T_" + generateUniqueName();
+            createTable(tableName);
+            String indexName = "I_" + generateUniqueName();
+            String indexDDL = String.format("create index %s on %s (val1) 
include (val2, val3)",
+                    indexName, tableName);
+            conn.createStatement().execute(indexDDL);
+            updateRow(conn, tableName, "a1");
+            String indexColumnValue;
+            String expectedValue;
+            String dql = "select val1, val2 from " + tableName + " where id = 
'a1'";
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+                String explainPlan = 
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+                assertFalse(explainPlan.contains(indexName));
+                assertTrue(rs.next());
+                indexColumnValue = rs.getString(1);
+                expectedValue = rs.getString(2);
+                assertFalse(rs.next());
+            }
+            // Insert an orphan index row by failing data table update
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+            try {
+                updateColumn(conn, tableName, "a1", 2, "col2_xyz");
+                conn.commit();
+                fail("An exception should have been thrown");
+            } catch (Exception ignored) {
+                // Ignore the exception
+            } finally {
+                IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            }
+            // Insert another orphan index row
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+            try {
+                updateColumn(conn, tableName, "a1", 2, "col2_abc");
+                conn.commit();
+                fail("An exception should have been thrown");
+            } catch (Exception ignored) {
+                // Ignore the exception
+            } finally {
+                IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            }
+            TestUtil.dumpTable(conn, TableName.valueOf(indexName));
+            // do a read on the index which should trigger a read repair
+            dql = "select val2 from " + tableName + " where val1 = '" + 
indexColumnValue + "'";
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+                String explainPlan = 
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+                assertTrue(explainPlan.contains(indexName));
+                assertTrue(rs.next());
+                assertEquals(rs.getString(1), expectedValue);
+                assertFalse(rs.next());
+            }
+            TestUtil.dumpTable(conn, TableName.valueOf(indexName));
+            flush(TableName.valueOf(indexName));
+            majorCompact(TableName.valueOf(indexName));
+            TestUtil.dumpTable(conn, TableName.valueOf(indexName));
+            // run the same query again after compaction
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+                String explainPlan = 
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+                assertTrue(explainPlan.contains(indexName));
+                assertTrue(rs.next());
+                assertEquals(rs.getString(1), expectedValue);
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
     private void flush(TableName table) throws IOException {
         Admin admin = getUtility().getAdmin();
         admin.flush(table);

Reply via email to