This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 59fdbfd511 PHOENIX-7313 All cell versions should not be retained
during flushes … (#1888)
59fdbfd511 is described below
commit 59fdbfd51175568645a4389294fd3610c4e56822
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Thu May 16 15:29:49 2024 -0700
PHOENIX-7313 All cell versions should not be retained during flushes …
(#1888)
---
.../coprocessor/BaseScannerRegionObserver.java | 32 ++++++++---
.../phoenix/coprocessor/CompactionScanner.java | 18 ++++--
.../phoenix/coprocessor/TTLRegionScanner.java | 10 ++--
.../UngroupedAggregateRegionObserver.java | 10 +++-
.../phoenix/end2end/MaxLookbackExtendedIT.java | 66 ++++++++++++++++++++++
.../org/apache/phoenix/end2end/TableTTLIT.java | 52 +++++++++++++++++
.../java/org/apache/phoenix/util/TestUtil.java | 12 ++--
7 files changed, 177 insertions(+), 23 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index ab52dd69d4..0214f8bf32 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -350,20 +350,33 @@ abstract public class BaseScannerRegionObserver
implements RegionObserver {
dataRegion, indexMaintainer, null, viewConstants, null, null,
projector, ptr, useQualiferAsListIndex);
}
- public void setScanOptionsForFlushesAndCompactions(ScanOptions options) {
+ public void setScanOptionsForFlushesAndCompactions(Store store,
ScanOptions options,
+ boolean retainAllVersions) {
// We want the store to give us all the deleted cells to
StoreCompactionScanner
options.setKeepDeletedCells(KeepDeletedCells.TTL);
options.setTTL(HConstants.FOREVER);
- options.setMaxVersions(Integer.MAX_VALUE);
- options.setMinVersions(Integer.MAX_VALUE);
+ if (retainAllVersions) {
+ options.setMaxVersions(Integer.MAX_VALUE);
+ options.setMinVersions(Integer.MAX_VALUE);
+ } else {
+ options.setMinVersions(Math.max(Math.max(options.getMaxVersions(),
+ store.getColumnFamilyDescriptor().getMaxVersions()), 1));
+ options.setMinVersions(Math.max(Math.max(options.getMinVersions(),
+ store.getColumnFamilyDescriptor().getMaxVersions()), 1));
+ }
+
}
+
@Override
public void
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
ScanType scanType, ScanOptions options, CompactionLifeCycleTracker
tracker,
CompactionRequest request) throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
- setScanOptionsForFlushesAndCompactions(options);
+ boolean retainAllVersions = isMaxLookbackTimeEnabled(
+
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf))
+ || request.isMajor();
+ setScanOptionsForFlushesAndCompactions(store, options,
retainAllVersions);
return;
}
if (isMaxLookbackTimeEnabled(conf)) {
@@ -376,8 +389,11 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
public void
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
ScanOptions options, FlushLifeCycleTracker tracker) throws
IOException {
Configuration conf = c.getEnvironment().getConfiguration();
+
if (isPhoenixTableTTLEnabled(conf)) {
- setScanOptionsForFlushesAndCompactions(options);
+ boolean retainAllVersions = isMaxLookbackTimeEnabled(
+
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
+ setScanOptionsForFlushesAndCompactions(store, options,
retainAllVersions);
return;
}
if (isMaxLookbackTimeEnabled(conf)) {
@@ -392,7 +408,9 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
- setScanOptionsForFlushesAndCompactions(options);
+ boolean retainAllVersions = isMaxLookbackTimeEnabled(
+
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
+ setScanOptionsForFlushesAndCompactions(store, options,
retainAllVersions);
return;
}
if (isMaxLookbackTimeEnabled(conf)) {
@@ -418,7 +436,7 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
- setScanOptionsForFlushesAndCompactions(options);
+ setScanOptionsForFlushesAndCompactions(store, options, true);
return;
}
if (!storeFileScanDoesntNeedAlteration(options)) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 3bcc2cefa8..ebe92b8741 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -76,6 +76,8 @@ public class CompactionScanner implements InternalScanner {
private final byte[] emptyCF;
private final byte[] emptyCQ;
private final byte[] storeColumnFamily;
+ private final String tableName;
+ private final String columnFamilyName;
private static Map<String, Long> maxLookbackMap = new
ConcurrentHashMap<>();
private PhoenixLevelRowCompactor phoenixLevelRowCompactor;
private HBaseLevelRowCompactor hBaseLevelRowCompactor;
@@ -94,19 +96,18 @@ public class CompactionScanner implements InternalScanner {
this.emptyCQ = emptyCQ;
this.config = env.getConfiguration();
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
- this.maxLookbackInMillis = maxLookbackInMillis;
- String columnFamilyName = store.getColumnFamilyName();
+ columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
- String tableName = region.getRegionInfo().getTable().getNameAsString();
+ tableName = region.getRegionInfo().getTable().getNameAsString();
Long overriddenMaxLookback =
maxLookbackMap.remove(tableName + SEPARATOR +
columnFamilyName);
- maxLookbackInMillis = overriddenMaxLookback == null ?
+ this.maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackInMillis : Math.max(maxLookbackInMillis,
overriddenMaxLookback);
// The oldest scn is current time - maxLookbackInMillis. Phoenix sets
the scan time range
// for scn queries [0, scn). This means that the maxlookback size
should be
// maxLookbackInMillis + 1 so that the oldest scn does not return
empty row
- this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
- compactionTime : compactionTime - (maxLookbackInMillis + 1);
+ this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ?
+ compactionTime : compactionTime - (this.maxLookbackInMillis +
1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
ttl = cfd.getTimeToLive();
this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime -
ttl * 1000;
@@ -121,6 +122,9 @@ public class CompactionScanner implements InternalScanner {
|| localIndex;
phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+ LOGGER.info("Starting Phoenix CompactionScanner for table " +
tableName + " store "
+ + columnFamilyName + " ttl " + ttl + "ms " + "max lookback "
+ + maxLookbackInMillis + "ms");
}
/**
@@ -155,6 +159,8 @@ public class CompactionScanner implements InternalScanner {
@Override
public void close() throws IOException {
+ LOGGER.info("Closing Phoenix CompactionScanner for table " + tableName
+ " store "
+ + columnFamilyName);
storeScanner.close();
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 261ef94fe8..aa1196130f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -70,14 +70,14 @@ public class TTLRegionScanner extends BaseRegionScanner {
long currentTime = scan.getTimeRange().getMax() ==
HConstants.LATEST_TIMESTAMP ?
EnvironmentEdgeManager.currentTimeMillis() :
scan.getTimeRange().getMax();
ttl =
env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
- ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl *
1000;
- ttl *= 1000;
// Regardless if the Phoenix Table TTL feature is disabled cluster
wide or the client is
// an older client and does not supply the empty column parameters,
the masking should not
- // be done here.
- isMaskingEnabled = emptyCF != null && emptyCQ != null &&
-
env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+ // be done here. We also disable masking when TTL is
HConstants.FOREVER.
+ isMaskingEnabled = emptyCF != null && emptyCQ != null && ttl !=
HConstants.FOREVER
+ &&
env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
+ ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl *
1000;
+ ttl *= 1000;
}
private void init() throws IOException {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 1eff655e41..39394a753b 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -110,6 +110,7 @@ import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -596,7 +597,14 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
InternalScanner internalScanner = scanner;
if (request.isMajor()) {
boolean isDisabled = false;
- final String fullTableName =
tableName.getNameAsString();
+ boolean isMultiTenantIndexTable = false;
+ if
(tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
+ isMultiTenantIndexTable = true;
+ }
+ final String fullTableName = isMultiTenantIndexTable ?
+
SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(),
+ MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) :
+ tableName.getNameAsString();
PTable table = null;
try (PhoenixConnection conn =
QueryUtil.getConnectionOnServer(
compactionConfig).unwrap(PhoenixConnection.class)) {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index 7c31b9c95a..97fbc43755 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -22,17 +22,22 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.CompactionScanner;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
@@ -45,7 +50,9 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
import java.sql.Connection;
+import java.sql.Date;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -60,6 +67,9 @@ import static
org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN;
import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
@@ -281,6 +291,62 @@ public class MaxLookbackExtendedIT extends BaseTest {
}
}
+ @Test(timeout=60000L)
+ public void testViewIndexIsCompacted() throws Exception {
+ String baseTable = SchemaUtil.getTableName("SCHEMA1",
generateUniqueName());
+ String globalViewName = generateUniqueName();
+ String fullGlobalViewName = SchemaUtil.getTableName("SCHEMA2",
globalViewName);
+ String globalViewIdx = generateUniqueName();
+ TableName dataTable = TableName.valueOf(baseTable);
+ TableName indexTable = TableName.valueOf("_IDX_" + baseTable);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + baseTable
+ + " (TENANT_ID CHAR(15) NOT NULL, PK2 INTEGER NOT NULL,
PK3 INTEGER NOT NULL, "
+ + "COL1 VARCHAR, COL2 VARCHAR, COL3 CHAR(15) CONSTRAINT PK
PRIMARY KEY"
+ + "(TENANT_ID, PK2, PK3)) MULTI_TENANT=true");
+ conn.createStatement().execute("CREATE VIEW " + fullGlobalViewName
+ + " AS SELECT * FROM " + baseTable);
+ conn.createStatement().execute("CREATE INDEX " + globalViewIdx + "
ON "
+ + fullGlobalViewName + " (COL1) INCLUDE (COL2)");
+
+ conn.createStatement().executeUpdate("UPSERT INTO " +
fullGlobalViewName
+ + " (TENANT_ID, PK2, PK3, COL1, COL2) VALUES
('TenantId1',1, 2, 'a', 'b')");
+ conn.commit();
+
+ String query = "SELECT COL2 FROM " + fullGlobalViewName + " WHERE
COL1 = 'a'";
+ // Verify that query uses the global view index
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ PTable table =
((PhoenixResultSet)rs).getContext().getCurrentTable().getTable();
+ assertTrue(table.getSchemaName().getString().equals("SCHEMA2") &&
+ table.getTableName().getString().equals(globalViewIdx));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertFalse(rs.next());
+ // Force a flush
+ flush(dataTable);
+ flush(indexTable);
+ assertRawRowCount(conn, dataTable, 1);
+ assertRawRowCount(conn, indexTable, 1);
+ // Delete the row from both tables
+ conn.createStatement().execute("DELETE FROM " + fullGlobalViewName
+ + " WHERE TENANT_ID = 'TenantId1'");
+ conn.commit();
+ // Force a flush
+ flush(dataTable);
+ flush(indexTable);
+ assertRawRowCount(conn, dataTable, 1);
+ assertRawRowCount(conn, indexTable, 1);
+ // Move change beyond the max lookback window
+ injectEdge.setValue(System.currentTimeMillis() + MAX_LOOKBACK_AGE
* 1000 + 1);
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ // Major compact both tables
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ // Everything should have been purged by major compaction
+ assertRawRowCount(conn, dataTable, 0);
+ assertRawRowCount(conn, indexTable, 0);
+ }
+ }
@Test(timeout=60000L)
public void testTTLAndMaxLookbackAge() throws Exception {
Configuration conf = getUtility().getConfiguration();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index e65bee738c..00a8027f54 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -52,6 +53,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import static org.junit.Assert.assertTrue;
+
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class TableTTLIT extends BaseTest {
@@ -222,6 +225,45 @@ public class TableTTLIT extends BaseTest {
}
}
+ @Test
+ public void
testFlushesAndMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
+ throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String tableName = generateUniqueName();
+ createTable(tableName);
+ conn.createStatement().execute("Alter Table " + tableName + " set
\"phoenix.max.lookback.age.seconds\" = 0");
+ conn.commit();
+ final int flushCount = 10;
+ byte[] row = Bytes.toBytes("a");
+ for (int i = 0; i < flushCount; i++) {
+ // Generate more row versions than the maximum cell versions
for the table
+ int updateCount = RAND.nextInt(10) + versions;
+ for (int j = 0; j < updateCount; j++) {
+ updateRow(conn, tableName, "a");
+ }
+ flush(TableName.valueOf(tableName));
+ // At every flush, extra cell versions should be removed.
+ // MAX_COLUMN_INDEX table columns and one empty column will be
retained for
+ // each row version.
+ assertTrue(TestUtil.getRawCellCount(conn,
TableName.valueOf(tableName), row)
+ <= (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
+ }
+ // Run one minor compaction (in case no minor compaction has
happened yet)
+ Admin admin = utility.getAdmin();
+ admin.compact(TableName.valueOf(tableName));
+ int waitCount = 0;
+ while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
+ Bytes.toBytes("a")) >= flushCount * (MAX_COLUMN_INDEX + 1)
* versions) {
+ // Wait for minor compactions to happen
+ Thread.sleep(1000);
+ waitCount++;
+ if (waitCount > 120) {
+ Assert.fail();
+ }
+ }
+ }
+ }
+
@Test
public void testRowSpansMultipleTTLWindows() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
@@ -291,6 +333,16 @@ public class TableTTLIT extends BaseTest {
conn.commit();
}
+ private void updateRow(Connection conn, String tableName, String id)
+ throws SQLException {
+
+ for (int i = 1; i <= MAX_COLUMN_INDEX; i++) {
+ String value = Integer.toString(RAND.nextInt(1000));
+ updateColumn(conn, tableName, id, i, value);
+ }
+ conn.commit();
+ }
+
private void compareRow(Connection conn, String tableName1, String
tableName2, String id,
int maxColumnIndex) throws SQLException, IOException {
StringBuilder queryBuilder = new StringBuilder("SELECT ");
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index cf076f08ef..30e2c4991f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1364,13 +1364,17 @@ public class TestUtil {
assertEquals(expectedRowCount, count);
}
- public static void assertRawCellCount(Connection conn, TableName tableName,
- byte[] row, int expectedCellCount)
- throws SQLException, IOException {
+ public static int getRawCellCount(Connection conn, TableName tableName,
byte[] row)
+ throws SQLException, IOException {
ConnectionQueryServices cqs =
conn.unwrap(PhoenixConnection.class).getQueryServices();
Table table = cqs.getTable(tableName.getName());
CellCount cellCount = getCellCount(table, true);
- int count = cellCount.getCellCount(Bytes.toString(row));
+ return cellCount.getCellCount(Bytes.toString(row));
+ }
+ public static void assertRawCellCount(Connection conn, TableName tableName,
+ byte[] row, int expectedCellCount)
+ throws SQLException, IOException {
+ int count = getRawCellCount(conn, tableName, row);
assertEquals(expectedCellCount, count);
}