This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 973778f9be PHOENIX-7501 GC issues in TTLRegionScanner when gap is more
than TTL (#2073)
973778f9be is described below
commit 973778f9be425a41b430631097372f8ba8dc9c39
Author: tkhurana <[email protected]>
AuthorDate: Wed Feb 12 11:00:23 2025 -0800
PHOENIX-7501 GC issues in TTLRegionScanner when gap is more than TTL (#2073)
PHOENIX-7501 GC issues in TTLRegionScanner when gap is more than TTL
---
.../phoenix/coprocessor/CompactionScanner.java | 18 ++--
.../phoenix/coprocessor/TTLRegionScanner.java | 85 +++++++++------
.../org/apache/phoenix/end2end/TableTTLIT.java | 120 ++++++++++++++++++++-
3 files changed, 183 insertions(+), 40 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 241ea00f36..f80125c6b3 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -356,7 +356,7 @@ public class CompactionScanner implements InternalScanner {
return false;
}
}
- /*
+
private void printRow(List<Cell> result, String title, boolean sort) {
List<Cell> row;
if (sort) {
@@ -370,31 +370,33 @@ public class CompactionScanner implements InternalScanner
{
+ "compaction time: " + compactionTime);
System.out.println("Max lookback window start time: " +
maxLookbackWindowStart);
System.out.println("Max lookback in ms: " + maxLookbackInMillis);
- System.out.println("TTL in ms: " + ttlInMillis);
+ RowContext rowContext =
phoenixLevelRowCompactor.rowTracker.getRowContext();
+ System.out.println("TTL in ms: " + rowContext.ttl);
boolean maxLookbackLine = false;
boolean ttlLine = false;
for (Cell cell : row) {
if (!maxLookbackLine && cell.getTimestamp() <
maxLookbackWindowStart) {
- System.out.println("-----> Max lookback window start time: " +
maxLookbackWindowStart);
+ //System.out.println("-----> Max lookback window start time: "
+ maxLookbackWindowStart);
maxLookbackLine = true;
- } else if (!ttlLine && cell.getTimestamp() < ttlWindowStart) {
- System.out.println("-----> TTL window start time: " +
ttlWindowStart);
+ } else if (!ttlLine && cell.getTimestamp() <
rowContext.ttlWindowStart) {
+ //System.out.println("-----> TTL window start time: " +
rowContext.ttlWindowStart);
ttlLine = true;
}
System.out.println(cell);
}
}
- */
@Override
public boolean next(List<Cell> result) throws IOException {
boolean hasMore = storeScanner.next(result);
inputCellCount += result.size();
if (!result.isEmpty()) {
- // printRow(result, "Input for " + tableName + " " +
columnFamilyName, true); // This is for debugging
+ // This is for debugging
+ //printRow(result, "Input for " + tableName + " " +
columnFamilyName, true);
phoenixLevelRowCompactor.compact(result, false);
outputCellCount += result.size();
- // printRow(result, "Output for " + tableName + " " +
columnFamilyName, true); // This is for debugging
+ // This is for debugging
+ //printRow(result, "Output for " + tableName + " " +
columnFamilyName, true);
}
return hasMore;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 2543128ece..6ce96c12b5 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -33,8 +32,6 @@ import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
@@ -128,38 +125,66 @@ public class TTLRegionScanner extends BaseRegionScanner {
if (maxTimestamp - minTimestamp <= ttl) {
return false;
}
- // We need check if the gap between two consecutive cell timestamps is
more than ttl
- // and if so trim the cells beyond the gap
- Scan singleRowScan = new Scan();
- singleRowScan.setRaw(true);
- singleRowScan.readAllVersions();
- singleRowScan.setTimeRange(scan.getTimeRange().getMin(),
scan.getTimeRange().getMax());
- byte[] rowKey = CellUtil.cloneRow(result.get(0));
- singleRowScan.withStartRow(rowKey, true);
- singleRowScan.withStopRow(rowKey, true);
- RegionScanner scanner =
((DelegateRegionScanner)delegate).getNewRegionScanner(singleRowScan);
+
+ // We need to check if the gap between two consecutive cell timestamps
is more than ttl
+ // and if so trim the cells beyond the gap. The gap analysis works by
doing a scan in a
+ // sliding time range window of ttl width. This scan reads the latest
version of the row in
+ // that time range. If we find a version, then in that time range
there is no gap. We find
+ // the timestamp at which the update happened and then slide the
window past that
+ // timestamp. If no version is returned, then we have found a gap.
+ // On a gap, all the cells below the current sliding window's end time
+ // can be trimmed from the result. We slide the window past the
current end time to find
+ // any more gaps so that we can find the largest timestamp in the
+ // [minTimestamp, maxTimestamp] window below which all the cells can
be trimmed.
+ // This algorithm doesn't read all the row versions into the memory
since the
+ // number of row versions can be unbounded and reading all of them at
once can cause GC
+ // issues. In practice, ttl windows are in days or months so the entire
+ // [minTimestamp, maxTimestamp] range shouldn't span more than 2-3 ttl
windows.
+ // We know that an update happened at minTimestamp so initialize the
sliding window
+ // to [minTimestamp + 1, minTimestamp + ttl] which means the scan
range should be
+ // [minTimestamp + 1, minTimestamp + ttl + 1).
+ long wndStartTS = minTimestamp + 1;
+ long wndEndTS = wndStartTS + ttl;
+ // any cell in the scan result list having a timestamp below
trimTimestamp will be
+ // removed from the list and not returned back to the client.
Initially, it is equal to
+ // the minTimestamp.
+ long trimTimestamp = minTimestamp;
List<Cell> row = new ArrayList<>();
- scanner.next(row);
- scanner.close();
- if (row.isEmpty()) {
- return true;
- }
- int size = row.size();
- long tsArray[] = new long[size];
- int i = 0;
- for (Cell cell : row) {
- tsArray[i++] = cell.getTimestamp();
- }
- Arrays.sort(tsArray);
- for (i = size - 1; i > 0; i--) {
- if (tsArray[i] - tsArray[i - 1] > ttl) {
- minTimestamp = tsArray[i];
- break;
+ LOG.debug("Doing gap analysis for {} min = {}, max = {}",
+ env.getRegionInfo().getRegionNameAsString(), minTimestamp,
maxTimestamp);
+ while (wndEndTS <= maxTimestamp) {
+ LOG.debug("WndStart = {}, WndEnd = {}, trim = {}", wndStartTS,
wndEndTS, trimTimestamp);
+ row.clear(); // reset the row on every iteration
+ Scan singleRowScan = new Scan();
+ singleRowScan.setTimeRange(wndStartTS, wndEndTS);
+ byte[] rowKey = CellUtil.cloneRow(result.get(0));
+ singleRowScan.withStartRow(rowKey, true);
+ singleRowScan.withStopRow(rowKey, true);
+ RegionScanner scanner =
+ ((DelegateRegionScanner)
delegate).getNewRegionScanner(singleRowScan);
+ scanner.next(row);
+ scanner.close();
+ if (row.isEmpty()) {
+ // no update in this window, we found a gap and the row expired
+ trimTimestamp = wndEndTS - 1;
+ LOG.debug("Found gap at {}", trimTimestamp);
+ // next window will start at wndEndTS. Scan timeranges are
half-open [min, max)
+ wndStartTS = wndEndTS;
+ } else {
+ // we found an update within the ttl
+ long lastUpdateTS = 0;
+ for (Cell cell : row) {
+ lastUpdateTS = Math.max(lastUpdateTS, cell.getTimestamp());
+ }
+ // slide the window 1 past the lastUpdateTS
+ LOG.debug("lastUpdateTS = {}", lastUpdateTS);
+ wndStartTS = lastUpdateTS + 1;
}
+ wndEndTS = wndStartTS + ttl;
}
Iterator<Cell> iterator = result.iterator();
while(iterator.hasNext()) {
- if (iterator.next().getTimestamp() < minTimestamp) {
+ if (iterator.next().getTimestamp() < trimTimestamp) {
iterator.remove();
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index a4f77a8a6b..d23bb707aa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -53,7 +53,11 @@ import java.util.Properties;
import java.util.Random;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
@@ -338,15 +342,127 @@ public class TableTTLIT extends BaseTest {
updateColumn(conn, tableName, "a2", 3, "col3");
updateColumn(conn, tableName, "a3", 5, "col5");
conn.commit();
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ // check that all older columns are masked
+ while (rs.next()) {
+ String id = rs.getString(1);
+ int updatedColIndex = 0;
+ if (id.equals("a1")) {
+ updatedColIndex = 2;
+ } else if (id.equals("a2")) {
+ updatedColIndex = 3;
+ } else if (id.equals("a3")) {
+ updatedColIndex = 5;
+ } else {
+ fail(String.format("Got unexpected row key %s", id));
+ }
+ for (int colIndex = 1; colIndex <= MAX_COLUMN_INDEX;
++colIndex) {
+ if (colIndex != updatedColIndex) {
+ assertNull(rs.getString(colIndex + 1));
+ } else {
+ assertNotNull(rs.getString(colIndex + 1));
+ }
+ }
+ }
flush(TableName.valueOf(tableName));
majorCompact(TableName.valueOf(tableName));
- String dql = "SELECT count(*) from " + tableName;
- ResultSet rs = conn.createStatement().executeQuery(dql);
+ dql = "SELECT count(*) from " + tableName;
+ rs = conn.createStatement().executeQuery(dql);
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
}
}
+ @Test
+ public void testMaskingGapAnalysis() throws Exception {
+ // for the purpose of this test only considering cases when
maxlookback is 0
+ if (tableLevelMaxLooback == null || tableLevelMaxLooback != 0) {
+ return;
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String tableName = generateUniqueName();
+ createTable(tableName);
+ long startTime = System.currentTimeMillis() + 1000;
+ startTime = (startTime / 1000) * 1000;
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(startTime);
+ updateRow(conn, tableName, "a1"); // min timestamp
+ conn.commit();
+ String dql = String.format("SELECT * from %s where id='%s'",
tableName, "a1");
+ String [] colValues = new String[MAX_COLUMN_INDEX + 1];
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertTrue(rs.next());
+ for (int i = 1; i <= MAX_COLUMN_INDEX; ++i) {
+ // initialize the column values to the current row version
+ colValues[i] = rs.getString("val" + i);
+ }
+ }
+ for (int iter = 1; iter <= 20; ++iter) {
+ injectEdge.incrementValue(1);
+ int colIndex = RAND.nextInt(MAX_COLUMN_INDEX) + 1;
+ String value = Integer.toString(RAND.nextInt(1000));
+ updateColumn(conn, tableName, "a1", colIndex, value);
+ conn.commit();
+ colValues[colIndex] = value;
+
+ injectEdge.incrementValue(ttl * 1000);
+ colIndex = RAND.nextInt(MAX_COLUMN_INDEX) + 1;
+ value = Integer.toString(RAND.nextInt(1000));
+ updateColumn(conn, tableName, "a1", colIndex, value);
+ conn.commit();
+ colValues[colIndex] = value;
+
+ if (iter % 5 == 0) {
+ // every 5th iteration introduce a gap
+ // first verify the current row
+ try (ResultSet rs =
conn.createStatement().executeQuery(dql)) {
+ assertTrue(rs.next());
+ for (int i = 1; i <= MAX_COLUMN_INDEX; ++i) {
+ Assert.assertEquals(colValues[i],
rs.getString("val" + i));
+ }
+ }
+ // inject the gap
+ injectEdge.incrementValue(ttl*1000 +
RAND.nextInt(ttl*1000));
+ // row expires after the gap so reset the col values to
null
+ colValues = new String[MAX_COLUMN_INDEX + 1];
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleUpdatesToSingleColumn() throws Exception {
+ // for the purpose of this test only considering cases when
maxlookback is 0
+ if (tableLevelMaxLooback == null || tableLevelMaxLooback != 0) {
+ return;
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String tableName = generateUniqueName();
+ createTable(tableName);
+ long startTime = System.currentTimeMillis() + 1000;
+ startTime = (startTime / 1000) * 1000;
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(startTime);
+ updateRow(conn, tableName, "a1");
+ injectEdge.incrementValue(1);
+ for (int i = 0; i < 15; ++i) {
+ updateColumn(conn, tableName, "a1", 2, "col2_" + i);
+ conn.commit();
+ injectEdge.incrementValue((ttl / 10) * 1000);
+ }
+ conn.commit();
+ String dql = "select * from " + tableName + " where id='a1'";
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ while (rs.next()) {
+ for (int col = 1; col <= MAX_COLUMN_INDEX + 1; col++) {
+ System.out.println(rs.getString(col));
+ }
+ }
+ }
+ }
+ }
+
private void flush(TableName table) throws IOException {
Admin admin = getUtility().getAdmin();
admin.flush(table);