This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.2 by this push:
     new 39222cd49b PHOENIX-7501 GC issues in TTLRegionScanner when gap is more 
than TTL (#2073) (#2076)
39222cd49b is described below

commit 39222cd49b60460c951e22941c3319de3387f5bc
Author: tkhurana <[email protected]>
AuthorDate: Sat Feb 15 16:24:43 2025 -0800

    PHOENIX-7501 GC issues in TTLRegionScanner when gap is more than TTL 
(#2073) (#2076)
---
 .../phoenix/coprocessor/CompactionScanner.java     |   8 +-
 .../phoenix/coprocessor/TTLRegionScanner.java      |  83 ++++++++++-----
 .../org/apache/phoenix/end2end/TableTTLIT.java     | 118 ++++++++++++++++++++-
 3 files changed, 177 insertions(+), 32 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 23f1866ce2..839c2efde8 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -207,6 +207,7 @@ public class CompactionScanner implements InternalScanner {
             return false;
         }
     }
+
     private void printRow(List<Cell> result, String title, boolean sort) {
         List<Cell> row;
         if (sort) {
@@ -234,15 +235,18 @@ public class CompactionScanner implements InternalScanner 
{
             System.out.println(cell);
         }
     }
+
     @Override
     public boolean next(List<Cell> result) throws IOException {
         boolean hasMore = storeScanner.next(result);
         inputCellCount += result.size();
         if (!result.isEmpty()) {
-            // printRow(result, "Input for " + tableName + " " + 
columnFamilyName, true); // This is for debugging
+            // This is for debugging
+            //printRow(result, "Input for " + tableName + " " + 
columnFamilyName, true);
             phoenixLevelRowCompactor.compact(result, false);
             outputCellCount += result.size();
-            // printRow(result, "Output for " + tableName + " " + 
columnFamilyName, true); // This is for debugging
+            // This is for debugging
+            //printRow(result, "Output for " + tableName + " " + 
columnFamilyName, true);
         }
         return hasMore;
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index aa1196130f..57fcc60c44 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
@@ -117,38 +116,66 @@ public class TTLRegionScanner extends BaseRegionScanner {
         if (maxTimestamp - minTimestamp <= ttl) {
             return false;
         }
-        // We need check if the gap between two consecutive cell timestamps is 
more than ttl
-        // and if so trim the cells beyond the gap
-        Scan singleRowScan = new Scan();
-        singleRowScan.setRaw(true);
-        singleRowScan.readAllVersions();
-        singleRowScan.setTimeRange(scan.getTimeRange().getMin(), 
scan.getTimeRange().getMax());
-        byte[] rowKey = CellUtil.cloneRow(result.get(0));
-        singleRowScan.withStartRow(rowKey, true);
-        singleRowScan.withStopRow(rowKey, true);
-        RegionScanner scanner = 
((DelegateRegionScanner)delegate).getNewRegionScanner(singleRowScan);
+
+        // We need to check if the gap between two consecutive cell timestamps 
is more than ttl
+        // and if so trim the cells beyond the gap. The gap analysis works by 
doing a scan in a
+        // sliding time range window of ttl width. This scan reads the latest 
version of the row in
+        // that time range. If we find a version, then in that time range 
there is no gap. We find
+        // the timestamp at which the update happened and then slide the 
window past that
+        // timestamp. If no version is returned, then we have found a gap.
+        // On a gap, all the cells below the current sliding window's end time
+        // can be trimmed from the result. We slide the window past the 
current end time to find
+        // any more gaps so that we can find the largest timestamp in the
+        // [minTimestamp, maxTimestamp] window below which all the cells can 
be trimmed.
+        // This algorithm doesn't read all the row versions into the memory 
since the
+        // number of row versions can be unbounded and reading all of them at 
once can cause GC
+        // issues. In practice, ttl windows are in days or months so the entire
+        // [minTimestamp, maxTimestamp] range shouldn't span more than 2-3 ttl 
windows.
+        // We know that an update happened at minTimestamp so initialize the 
sliding window
+        // to [minTimestamp + 1, minTimestamp + ttl] which means the scan 
range should be
+        // [minTimestamp + 1, minTimestamp + ttl + 1).
+        long wndStartTS = minTimestamp + 1;
+        long wndEndTS = wndStartTS + ttl;
+        // any cell in the scan result list having a timestamp below 
trimTimestamp will be
+        // removed from the list and not returned back to the client. 
Initially, it is equal to
+        // the minTimestamp.
+        long trimTimestamp = minTimestamp;
         List<Cell> row = new ArrayList<>();
-        scanner.next(row);
-        scanner.close();
-        if (row.isEmpty()) {
-            return true;
-        }
-        int size = row.size();
-        long tsArray[] = new long[size];
-        int i = 0;
-        for (Cell cell : row) {
-            tsArray[i++] = cell.getTimestamp();
-        }
-        Arrays.sort(tsArray);
-        for (i = size - 1; i > 0; i--) {
-            if (tsArray[i] - tsArray[i - 1] > ttl) {
-                minTimestamp = tsArray[i];
-                break;
+        LOG.debug("Doing gap analysis for {} min = {}, max = {}",
+                env.getRegionInfo().getRegionNameAsString(), minTimestamp, 
maxTimestamp);
+        while (wndEndTS <= maxTimestamp) {
+            LOG.debug("WndStart = {}, WndEnd = {}, trim = {}", wndStartTS, 
wndEndTS, trimTimestamp);
+            row.clear(); // reset the row on every iteration
+            Scan singleRowScan = new Scan();
+            singleRowScan.setTimeRange(wndStartTS, wndEndTS);
+            byte[] rowKey = CellUtil.cloneRow(result.get(0));
+            singleRowScan.withStartRow(rowKey, true);
+            singleRowScan.withStopRow(rowKey, true);
+            RegionScanner scanner =
+                    ((DelegateRegionScanner) 
delegate).getNewRegionScanner(singleRowScan);
+            scanner.next(row);
+            scanner.close();
+            if (row.isEmpty()) {
+                // no update in this window, we found a gap and the row expired
+                trimTimestamp = wndEndTS - 1;
+                LOG.debug("Found gap at {}", trimTimestamp);
+                // next window will start at wndEndTS. Scan timeranges are 
half-open [min, max)
+                wndStartTS = wndEndTS;
+            } else {
+                // we found an update within the ttl
+                long lastUpdateTS = 0;
+                for (Cell cell : row) {
+                    lastUpdateTS = Math.max(lastUpdateTS, cell.getTimestamp());
+                }
+                // slide the window 1 past the lastUpdateTS
+                LOG.debug("lastUpdateTS = {}", lastUpdateTS);
+                wndStartTS = lastUpdateTS + 1;
             }
+            wndEndTS = wndStartTS + ttl;
         }
         Iterator<Cell> iterator = result.iterator();
         while(iterator.hasNext()) {
-            if (iterator.next().getTimestamp() < minTimestamp) {
+            if (iterator.next().getTimestamp() < trimTimestamp) {
                 iterator.remove();
             }
         }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 2ab39fb11b..596b0490d8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -54,7 +54,11 @@ import java.util.Properties;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
@@ -322,15 +326,125 @@ public class TableTTLIT extends BaseTest {
             updateColumn(conn, tableName, "a2", 3, "col3");
             updateColumn(conn, tableName, "a3", 5, "col5");
             conn.commit();
+            String dql = "SELECT * from " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery(dql);
+            // check that all older columns are masked
+            while (rs.next()) {
+                String id = rs.getString(1);
+                int updatedColIndex = 0;
+                if (id.equals("a1")) {
+                    updatedColIndex = 2;
+                } else if (id.equals("a2")) {
+                    updatedColIndex = 3;
+                } else if (id.equals("a3")) {
+                    updatedColIndex = 5;
+                } else {
+                    fail(String.format("Got unexpected row key %s", id));
+                }
+                for (int colIndex = 1; colIndex <= MAX_COLUMN_INDEX; 
++colIndex) {
+                    if (colIndex != updatedColIndex) {
+                        assertNull(rs.getString(colIndex + 1));
+                    } else {
+                        assertNotNull(rs.getString(colIndex + 1));
+                    }
+                }
+            }
             flush(TableName.valueOf(tableName));
             majorCompact(TableName.valueOf(tableName));
-            String dql = "SELECT count(*) from " + tableName;
-            ResultSet rs = conn.createStatement().executeQuery(dql);
+            dql = "SELECT count(*) from " + tableName;
+            rs = conn.createStatement().executeQuery(dql);
             assertTrue(rs.next());
             assertEquals(3, rs.getInt(1));
         }
     }
 
+    @Test
+    public void testMaskingGapAnalysis() throws Exception {
+        // for the purpose of this test only considering cases when 
maxlookback is 0
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = generateUniqueName();
+            createTable(tableName);
+            conn.createStatement().execute(
+                    "Alter Table " + tableName + " set 
\"phoenix.max.lookback.age.seconds\" = 0");
+            long startTime = System.currentTimeMillis() + 1000;
+            startTime = (startTime / 1000) * 1000;
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            injectEdge.setValue(startTime);
+            updateRow(conn, tableName, "a1"); // min timestamp
+            conn.commit();
+            String dql = String.format("SELECT * from %s where id='%s'", 
tableName, "a1");
+            String [] colValues = new String[MAX_COLUMN_INDEX + 1];
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                assertTrue(rs.next());
+                for (int i = 1; i <= MAX_COLUMN_INDEX; ++i) {
+                    // initialize the column values to the current row version
+                    colValues[i] = rs.getString("val" + i);
+                }
+            }
+            for (int iter = 1; iter <= 20; ++iter) {
+                injectEdge.incrementValue(1);
+                int colIndex = RAND.nextInt(MAX_COLUMN_INDEX) + 1;
+                String value = Integer.toString(RAND.nextInt(1000));
+                updateColumn(conn, tableName, "a1", colIndex, value);
+                conn.commit();
+                colValues[colIndex] = value;
+
+                injectEdge.incrementValue(ttl * 1000);
+                colIndex = RAND.nextInt(MAX_COLUMN_INDEX) + 1;
+                value = Integer.toString(RAND.nextInt(1000));
+                updateColumn(conn, tableName, "a1", colIndex, value);
+                conn.commit();
+                colValues[colIndex] = value;
+
+                if (iter % 5 == 0) {
+                    // every 5th iteration introduce a gap
+                    // first verify the current row
+                    try (ResultSet rs = 
conn.createStatement().executeQuery(dql)) {
+                        assertTrue(rs.next());
+                        for (int i = 1; i <= MAX_COLUMN_INDEX; ++i) {
+                            Assert.assertEquals(colValues[i], 
rs.getString("val" + i));
+                        }
+                    }
+                    // inject the gap
+                    injectEdge.incrementValue(ttl*1000 + 
RAND.nextInt(ttl*1000));
+                    // row expires after the gap so reset the col values to 
null
+                    colValues = new String[MAX_COLUMN_INDEX + 1];
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testMultipleUpdatesToSingleColumn() throws Exception {
+        // for the purpose of this test only considering cases when 
maxlookback is 0
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = generateUniqueName();
+            createTable(tableName);
+            conn.createStatement().execute(
+                    "Alter Table " + tableName + " set 
\"phoenix.max.lookback.age.seconds\" = 0");
+            long startTime = System.currentTimeMillis() + 1000;
+            startTime = (startTime / 1000) * 1000;
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            injectEdge.setValue(startTime);
+            updateRow(conn, tableName, "a1");
+            injectEdge.incrementValue(1);
+            for (int i = 0; i < 15; ++i) {
+                updateColumn(conn, tableName, "a1", 2, "col2_" + i);
+                conn.commit();
+                injectEdge.incrementValue((ttl / 10) * 1000);
+            }
+            conn.commit();
+            String dql = "select * from " + tableName + " where id='a1'";
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                while (rs.next()) {
+                    for (int col = 1; col <= MAX_COLUMN_INDEX + 1; col++) {
+                        System.out.println(rs.getString(col));
+                    }
+                }
+            }
+        }
+    }
+
     private void flush(TableName table) throws IOException {
         Admin admin = getUtility().getAdmin();
         admin.flush(table);

Reply via email to