This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 973778f9be PHOENIX-7501 GC issues in TTLRegionScanner when gap is more 
than TTL (#2073)
973778f9be is described below

commit 973778f9be425a41b430631097372f8ba8dc9c39
Author: tkhurana <[email protected]>
AuthorDate: Wed Feb 12 11:00:23 2025 -0800

    PHOENIX-7501 GC issues in TTLRegionScanner when gap is more than TTL (#2073)
    
    PHOENIX-7501 GC issues in TTLRegionScanner when gap is more than TTL
---
 .../phoenix/coprocessor/CompactionScanner.java     |  18 ++--
 .../phoenix/coprocessor/TTLRegionScanner.java      |  85 +++++++++------
 .../org/apache/phoenix/end2end/TableTTLIT.java     | 120 ++++++++++++++++++++-
 3 files changed, 183 insertions(+), 40 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 241ea00f36..f80125c6b3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -356,7 +356,7 @@ public class CompactionScanner implements InternalScanner {
             return false;
         }
     }
-    /*
+
     private void printRow(List<Cell> result, String title, boolean sort) {
         List<Cell> row;
         if (sort) {
@@ -370,31 +370,33 @@ public class CompactionScanner implements InternalScanner 
{
                 + "compaction time: " + compactionTime);
         System.out.println("Max lookback window start time: " + 
maxLookbackWindowStart);
         System.out.println("Max lookback in ms: " + maxLookbackInMillis);
-        System.out.println("TTL in ms: " + ttlInMillis);
+        RowContext rowContext = 
phoenixLevelRowCompactor.rowTracker.getRowContext();
+        System.out.println("TTL in ms: " + rowContext.ttl);
         boolean maxLookbackLine = false;
         boolean ttlLine = false;
         for (Cell cell : row) {
             if (!maxLookbackLine && cell.getTimestamp() < 
maxLookbackWindowStart) {
-                System.out.println("-----> Max lookback window start time: " + 
maxLookbackWindowStart);
+                //System.out.println("-----> Max lookback window start time: " 
+ maxLookbackWindowStart);
                 maxLookbackLine = true;
-            } else if (!ttlLine && cell.getTimestamp() < ttlWindowStart) {
-                System.out.println("-----> TTL window start time: " + 
ttlWindowStart);
+            } else if (!ttlLine && cell.getTimestamp() < 
rowContext.ttlWindowStart) {
+                //System.out.println("-----> TTL window start time: " + 
rowContext.ttlWindowStart);
                 ttlLine = true;
             }
             System.out.println(cell);
         }
     }
-     */
 
     @Override
     public boolean next(List<Cell> result) throws IOException {
         boolean hasMore = storeScanner.next(result);
         inputCellCount += result.size();
         if (!result.isEmpty()) {
-            // printRow(result, "Input for " + tableName + " " + 
columnFamilyName, true); // This is for debugging
+            // This is for debugging
+            //printRow(result, "Input for " + tableName + " " + 
columnFamilyName, true);
             phoenixLevelRowCompactor.compact(result, false);
             outputCellCount += result.size();
-            // printRow(result, "Output for " + tableName + " " + 
columnFamilyName, true); // This is for debugging
+            // This is for debugging
+            //printRow(result, "Output for " + tableName + " " + 
columnFamilyName, true);
         }
         return hasMore;
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 2543128ece..6ce96c12b5 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
@@ -33,8 +32,6 @@ import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ScanUtil;
 import org.slf4j.Logger;
@@ -128,38 +125,66 @@ public class TTLRegionScanner extends BaseRegionScanner {
         if (maxTimestamp - minTimestamp <= ttl) {
             return false;
         }
-        // We need check if the gap between two consecutive cell timestamps is 
more than ttl
-        // and if so trim the cells beyond the gap
-        Scan singleRowScan = new Scan();
-        singleRowScan.setRaw(true);
-        singleRowScan.readAllVersions();
-        singleRowScan.setTimeRange(scan.getTimeRange().getMin(), 
scan.getTimeRange().getMax());
-        byte[] rowKey = CellUtil.cloneRow(result.get(0));
-        singleRowScan.withStartRow(rowKey, true);
-        singleRowScan.withStopRow(rowKey, true);
-        RegionScanner scanner = 
((DelegateRegionScanner)delegate).getNewRegionScanner(singleRowScan);
+
+        // We need to check if the gap between two consecutive cell timestamps 
is more than ttl
+        // and if so trim the cells beyond the gap. The gap analysis works by 
doing a scan in a
+        // sliding time range window of ttl width. This scan reads the latest 
version of the row in
+        // that time range. If we find a version, then in that time range 
there is no gap. We find
+        // the timestamp at which the update happened and then slide the 
window past that
+        // timestamp. If no version is returned, then we have found a gap.
+        // On a gap, all the cells below the current sliding window's end time
+        // can be trimmed from the result. We slide the window past the 
current end time to find
+        // any more gaps so that we can find the largest timestamp in the
+        // [minTimestamp, maxTimestamp] window below which all the cells can 
be trimmed.
+        // This algorithm doesn't read all the row versions into the memory 
since the
+        // number of row versions can be unbounded and reading all of them at 
once can cause GC
+        // issues. In practice, ttl windows are in days or months so the entire
+        // [minTimestamp, maxTimestamp] range shouldn't span more than 2-3 ttl 
windows.
+        // We know that an update happened at minTimestamp so initialize the 
sliding window
+        // to [minTimestamp + 1, minTimestamp + ttl] which means the scan 
range should be
+        // [minTimestamp + 1, minTimestamp + ttl + 1).
+        long wndStartTS = minTimestamp + 1;
+        long wndEndTS = wndStartTS + ttl;
+        // any cell in the scan result list having a timestamp below 
trimTimestamp will be
+        // removed from the list and not returned back to the client. 
Initially, it is equal to
+        // the minTimestamp.
+        long trimTimestamp = minTimestamp;
         List<Cell> row = new ArrayList<>();
-        scanner.next(row);
-        scanner.close();
-        if (row.isEmpty()) {
-            return true;
-        }
-        int size = row.size();
-        long tsArray[] = new long[size];
-        int i = 0;
-        for (Cell cell : row) {
-            tsArray[i++] = cell.getTimestamp();
-        }
-        Arrays.sort(tsArray);
-        for (i = size - 1; i > 0; i--) {
-            if (tsArray[i] - tsArray[i - 1] > ttl) {
-                minTimestamp = tsArray[i];
-                break;
+        LOG.debug("Doing gap analysis for {} min = {}, max = {}",
+                env.getRegionInfo().getRegionNameAsString(), minTimestamp, 
maxTimestamp);
+        while (wndEndTS <= maxTimestamp) {
+            LOG.debug("WndStart = {}, WndEnd = {}, trim = {}", wndStartTS, 
wndEndTS, trimTimestamp);
+            row.clear(); // reset the row on every iteration
+            Scan singleRowScan = new Scan();
+            singleRowScan.setTimeRange(wndStartTS, wndEndTS);
+            byte[] rowKey = CellUtil.cloneRow(result.get(0));
+            singleRowScan.withStartRow(rowKey, true);
+            singleRowScan.withStopRow(rowKey, true);
+            RegionScanner scanner =
+                    ((DelegateRegionScanner) 
delegate).getNewRegionScanner(singleRowScan);
+            scanner.next(row);
+            scanner.close();
+            if (row.isEmpty()) {
+                // no update in this window, we found a gap and the row expired
+                trimTimestamp = wndEndTS - 1;
+                LOG.debug("Found gap at {}", trimTimestamp);
+                // next window will start at wndEndTS. Scan timeranges are 
half-open [min, max)
+                wndStartTS = wndEndTS;
+            } else {
+                // we found an update within the ttl
+                long lastUpdateTS = 0;
+                for (Cell cell : row) {
+                    lastUpdateTS = Math.max(lastUpdateTS, cell.getTimestamp());
+                }
+                // slide the window 1 past the lastUpdateTS
+                LOG.debug("lastUpdateTS = {}", lastUpdateTS);
+                wndStartTS = lastUpdateTS + 1;
             }
+            wndEndTS = wndStartTS + ttl;
         }
         Iterator<Cell> iterator = result.iterator();
         while(iterator.hasNext()) {
-            if (iterator.next().getTimestamp() < minTimestamp) {
+            if (iterator.next().getTimestamp() < trimTimestamp) {
                 iterator.remove();
             }
         }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index a4f77a8a6b..d23bb707aa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -53,7 +53,11 @@ import java.util.Properties;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
@@ -338,15 +342,127 @@ public class TableTTLIT extends BaseTest {
             updateColumn(conn, tableName, "a2", 3, "col3");
             updateColumn(conn, tableName, "a3", 5, "col5");
             conn.commit();
+            String dql = "SELECT * from " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery(dql);
+            // check that all older columns are masked
+            while (rs.next()) {
+                String id = rs.getString(1);
+                int updatedColIndex = 0;
+                if (id.equals("a1")) {
+                    updatedColIndex = 2;
+                } else if (id.equals("a2")) {
+                    updatedColIndex = 3;
+                } else if (id.equals("a3")) {
+                    updatedColIndex = 5;
+                } else {
+                    fail(String.format("Got unexpected row key %s", id));
+                }
+                for (int colIndex = 1; colIndex <= MAX_COLUMN_INDEX; 
++colIndex) {
+                    if (colIndex != updatedColIndex) {
+                        assertNull(rs.getString(colIndex + 1));
+                    } else {
+                        assertNotNull(rs.getString(colIndex + 1));
+                    }
+                }
+            }
             flush(TableName.valueOf(tableName));
             majorCompact(TableName.valueOf(tableName));
-            String dql = "SELECT count(*) from " + tableName;
-            ResultSet rs = conn.createStatement().executeQuery(dql);
+            dql = "SELECT count(*) from " + tableName;
+            rs = conn.createStatement().executeQuery(dql);
             assertTrue(rs.next());
             assertEquals(3, rs.getInt(1));
         }
     }
 
+    @Test
+    public void testMaskingGapAnalysis() throws Exception {
+        // for the purpose of this test only considering cases when 
maxlookback is 0
+        if (tableLevelMaxLooback == null || tableLevelMaxLooback != 0) {
+            return;
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = generateUniqueName();
+            createTable(tableName);
+            long startTime = System.currentTimeMillis() + 1000;
+            startTime = (startTime / 1000) * 1000;
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            injectEdge.setValue(startTime);
+            updateRow(conn, tableName, "a1"); // min timestamp
+            conn.commit();
+            String dql = String.format("SELECT * from %s where id='%s'", 
tableName, "a1");
+            String [] colValues = new String[MAX_COLUMN_INDEX + 1];
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                assertTrue(rs.next());
+                for (int i = 1; i <= MAX_COLUMN_INDEX; ++i) {
+                    // initialize the column values to the current row version
+                    colValues[i] = rs.getString("val" + i);
+                }
+            }
+            for (int iter = 1; iter <= 20; ++iter) {
+                injectEdge.incrementValue(1);
+                int colIndex = RAND.nextInt(MAX_COLUMN_INDEX) + 1;
+                String value = Integer.toString(RAND.nextInt(1000));
+                updateColumn(conn, tableName, "a1", colIndex, value);
+                conn.commit();
+                colValues[colIndex] = value;
+
+                injectEdge.incrementValue(ttl * 1000);
+                colIndex = RAND.nextInt(MAX_COLUMN_INDEX) + 1;
+                value = Integer.toString(RAND.nextInt(1000));
+                updateColumn(conn, tableName, "a1", colIndex, value);
+                conn.commit();
+                colValues[colIndex] = value;
+
+                if (iter % 5 == 0) {
+                    // every 5th iteration introduce a gap
+                    // first verify the current row
+                    try (ResultSet rs = 
conn.createStatement().executeQuery(dql)) {
+                        assertTrue(rs.next());
+                        for (int i = 1; i <= MAX_COLUMN_INDEX; ++i) {
+                            Assert.assertEquals(colValues[i], 
rs.getString("val" + i));
+                        }
+                    }
+                    // inject the gap
+                    injectEdge.incrementValue(ttl*1000 + 
RAND.nextInt(ttl*1000));
+                    // row expires after the gap so reset the col values to 
null
+                    colValues = new String[MAX_COLUMN_INDEX + 1];
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testMultipleUpdatesToSingleColumn() throws Exception {
+        // for the purpose of this test only considering cases when 
maxlookback is 0
+        if (tableLevelMaxLooback == null || tableLevelMaxLooback != 0) {
+            return;
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = generateUniqueName();
+            createTable(tableName);
+            long startTime = System.currentTimeMillis() + 1000;
+            startTime = (startTime / 1000) * 1000;
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            injectEdge.setValue(startTime);
+            updateRow(conn, tableName, "a1");
+            injectEdge.incrementValue(1);
+            for (int i = 0; i < 15; ++i) {
+                updateColumn(conn, tableName, "a1", 2, "col2_" + i);
+                conn.commit();
+                injectEdge.incrementValue((ttl / 10) * 1000);
+            }
+            conn.commit();
+            String dql = "select * from " + tableName + " where id='a1'";
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                while (rs.next()) {
+                    for (int col = 1; col <= MAX_COLUMN_INDEX + 1; col++) {
+                        System.out.println(rs.getString(col));
+                    }
+                }
+            }
+        }
+    }
+
     private void flush(TableName table) throws IOException {
         Admin admin = getUtility().getAdmin();
         admin.flush(table);

Reply via email to