This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 8aa728e5be PHOENIX-7402: Fix row getting expired partially even if
updated within TTL (#1973)
8aa728e5be is described below
commit 8aa728e5be99f9a89c6000219888ff947dfb9f53
Author: sanjeet006py <[email protected]>
AuthorDate: Wed Sep 25 00:45:38 2024 +0530
PHOENIX-7402: Fix row getting expired partially even if updated within TTL
(#1973)
Co-authored-by: Sanjeet Malhotra <[email protected]>
---
.../phoenix/coprocessor/CompactionScanner.java | 145 +++++++++++++++------
.../UngroupedAggregateRegionObserver.java | 52 +-------
.../phoenix/end2end/MaxLookbackExtendedIT.java | 54 +++++++-
.../org/apache/phoenix/end2end/TableTTLIT.java | 33 ++---
.../java/org/apache/phoenix/util/TestUtil.java | 21 +++
5 files changed, 195 insertions(+), 110 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 70b57fb213..e96406f142 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -67,6 +67,7 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PSmallint;
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -155,6 +156,9 @@ public class CompactionScanner implements InternalScanner {
private long outputCellCount = 0;
private boolean phoenixLevelOnly = false;
+ // Only for forcing minor compaction while testing
+ private static boolean forceMinorCompaction = false;
+
public CompactionScanner(RegionCoprocessorEnvironment env,
Store store,
InternalScanner storeScanner,
@@ -166,13 +170,16 @@ public class CompactionScanner implements InternalScanner
{
this.region = env.getRegion();
this.store = store;
this.env = env;
- this.emptyCF = major && table != null ?
SchemaUtil.getEmptyColumnFamily(table) : EMPTY_BYTE_ARRAY;
- this.emptyCQ = major && table != null ?
SchemaUtil.getEmptyColumnQualifier(table) : EMPTY_BYTE_ARRAY;
+ // Empty column family and qualifier are always needed to compute
which all empty cells to retain
+ // even during minor compactions. If required empty cells are not
retained during
+ // minor compactions then we can run into the risk of partial row
expiry on next major compaction.
+ this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+ this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table);
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
tableName = region.getRegionInfo().getTable().getNameAsString();
- String dataTableName = major && table != null ?
table.getName().toString() : "";
+ String dataTableName = table.getName().toString();
Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR
+ columnFamilyName);
this.maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackAgeInMillis : Math.max(maxLookbackAgeInMillis,
overriddenMaxLookback);
@@ -181,16 +188,14 @@ public class CompactionScanner implements InternalScanner
{
// maxLookbackInMillis + 1 so that the oldest scn does not return
empty row
this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ?
compactionTime : compactionTime - (this.maxLookbackInMillis + 1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
- this.major = major;
+ this.major = major && ! forceMinorCompaction;
this.minVersion = cfd.getMinVersions();
this.maxVersion = cfd.getMaxVersions();
this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL :
cfd.getKeepDeletedCells();
familyCount = region.getTableDescriptor().getColumnFamilies().length;
localIndex =
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
- emptyCFStore = major
- ? familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
- || localIndex
- : true; // we do not need to identify emptyCFStore for minor
compaction or flushes
+ emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
+ || localIndex;
// Initialize the tracker that computes the TTL for the compacting
table.
// The TTL tracker can be
@@ -198,15 +203,15 @@ public class CompactionScanner implements InternalScanner
{
// complex when the TTL can vary per row when the compacting table is
Partitioned.
TTLTracker
ttlTracker =
- major ?
- createTTLTrackerFor(env, store, table) :
- new TableTTLTrackerForFlushesAndMinor(tableName) ;
+ this.major ?
+ createTTLTrackerFor(env, store, table):
+ new TableTTLTrackerForFlushesAndMinor(tableName);
phoenixLevelRowCompactor = new PhoenixLevelRowCompactor(ttlTracker);
hBaseLevelRowCompactor = new HBaseLevelRowCompactor(ttlTracker);
LOGGER.info("Starting CompactionScanner for table " + tableName + "
store "
- + columnFamilyName + (major ? " major " : " not major ") +
"compaction ttl "
+ + columnFamilyName + (this.major ? " major " : " not major ")
+ "compaction ttl "
+ ttlTracker.getRowContext().getTTL() + "ms " + "max lookback
" + this.maxLookbackInMillis + "ms");
LOGGER.info(String.format("CompactionScanner params:- (" +
"physical-data-tablename = %s, compaction-tablename =
%s, region = %s, " +
@@ -214,15 +219,25 @@ public class CompactionScanner implements InternalScanner
{
"emptyCF = %s, emptyCQ = %s, " +
"minVersion = %d, maxVersion = %d, keepDeletedCells =
%s, " +
"familyCount = %d, localIndex = %s, emptyCFStore = %s,
" +
- "compactionTime = %d, maxLookbackWindowStart = %d,
maxLookbackInMillis = %d)",
+ "compactionTime = %d, maxLookbackWindowStart = %d,
maxLookbackInMillis = %d, major = %s)",
dataTableName, tableName,
region.getRegionInfo().getEncodedName(),
Bytes.toStringBinary(region.getRegionInfo().getStartKey()),
Bytes.toStringBinary(region.getRegionInfo().getEndKey()),
Bytes.toString(this.emptyCF), Bytes.toString(emptyCQ),
this.minVersion, this.maxVersion, this.keepDeletedCells.name(),
this.familyCount, this.localIndex, this.emptyCFStore,
- compactionTime, maxLookbackWindowStart, maxLookbackInMillis));
+ compactionTime, maxLookbackWindowStart, maxLookbackInMillis,
this.major));
+
+ }
+ @VisibleForTesting
+ public static void setForceMinorCompaction(boolean doMinorCompaction) {
+ forceMinorCompaction = doMinorCompaction;
+ }
+
+ @VisibleForTesting
+ public static boolean getForceMinorCompaction() {
+ return forceMinorCompaction;
}
/**
@@ -392,6 +407,9 @@ public class CompactionScanner implements InternalScanner {
+ columnFamilyName + (major ? " major " : " not major ") +
"compaction retained "
+ outputCellCount + " of " + inputCellCount + " cells"
+ (phoenixLevelOnly ? " phoenix level only" : ""));
+ if (forceMinorCompaction) {
+ forceMinorCompaction = false;
+ }
storeScanner.close();
}
@@ -2179,13 +2197,10 @@ public class CompactionScanner implements
InternalScanner {
List<Cell> lastRowVersion, List<Cell> retainedCells,
List<Cell> emptyColumn) {
Cell currentColumnCell = null;
boolean isEmptyColumn = false;
- Cell cellAtMaxLookbackWindowStart = null;
for (Cell cell : result) {
- if (cell.getTimestamp() >
rowTracker.getRowContext().getMaxLookbackWindowStart()) {
+ long maxLookbackWindowStart =
rowTracker.getRowContext().getMaxLookbackWindowStart();
+ if (cell.getTimestamp() > maxLookbackWindowStart) {
retainedCells.add(cell);
- if (cell.getTimestamp() == maxLookbackWindowStart) {
- cellAtMaxLookbackWindowStart = cell;
- }
continue;
}
if (!major && cell.getType() != Cell.Type.Put) {
@@ -2194,26 +2209,20 @@ public class CompactionScanner implements
InternalScanner {
if (currentColumnCell == null ||
!CellUtil.matchingColumn(cell, currentColumnCell)) {
currentColumnCell = cell;
- if (cell.getType() != Cell.Type.Delete
- && cell.getType() != Cell.Type.DeleteColumn) {
+ isEmptyColumn = ScanUtil.isEmptyColumn(cell, emptyCF,
emptyCQ);
+ if ((cell.getType() != Cell.Type.Delete
+ && cell.getType() != Cell.Type.DeleteColumn)
+ || cell.getTimestamp() == maxLookbackWindowStart) {
// Include only delete family markers and put cells
- // It is possible that this cell is not visible from
the max lookback
- // window. This happens when there is a mutation with
the mutation timestamp
- // equal to the max lookback window start timestamp.
The following is to
- // check for this case
- if (cellAtMaxLookbackWindowStart == null
- || !CellUtil.matchingColumn(cell,
cellAtMaxLookbackWindowStart)) {
- lastRowVersion.add(cell);
- }
- }
- if (major && ScanUtil.isEmptyColumn(cell, emptyCF,
emptyCQ)) {
- isEmptyColumn = true;
- } else {
- isEmptyColumn = false;
+ // The last row version can also be the cells with
timestamp
+ // same as timestamp of start of max lookback window
+ lastRowVersion.add(cell);
}
- } else if (major && isEmptyColumn) {
+ } else if (isEmptyColumn) {
// We only need to keep one cell for every column for the
last row version.
- // So here we just form the empty column beyond the last
row version
+ // So here we just form the empty column beyond the last
row version.
+ // Empty column needs to be collected during minor
compactions also
+ // else we will see partial row expiry.
emptyColumn.add(cell);
}
}
@@ -2223,7 +2232,7 @@ public class CompactionScanner implements InternalScanner
{
* Close the gap between the two timestamps, max and min, with the
minimum number of cells
* from the input list such that the timestamp difference between two
cells should
* not more than ttl. The cells that are used to close the gap are
added to the output
- * list.
+ * list. The input list is a list of empty cells in decreasing order
of timestamp.
*/
private void closeGap(long max, long min, long ttl, List<Cell> input,
List<Cell> output) {
int previous = -1;
@@ -2234,7 +2243,9 @@ public class CompactionScanner implements InternalScanner
{
previous++;
continue;
}
- if (previous == -1) {
+ if (previous == -1 && max - ts > ttl) {
+ // Means even the first empty cells in the input list
which is closest to
+ // max timestamp can't close the gap. So, gap can't be
closed by empty cells at all.
break;
}
if (max - ts > ttl) {
@@ -2247,6 +2258,42 @@ public class CompactionScanner implements
InternalScanner {
}
previous++;
}
+ if (previous > -1 && max - min > ttl) {
+ // This covers the case we need to retain the last empty cell
in the input list. The close gap
+ // algorithm is such that if we need to retain the i th empty
cell in the input list then we
+ // will get to know that once we are iterating on i+1 th empty
cell. So, to retain last empty cell
+ // in input list we need to check the min timestamp.
+ output.add(input.remove(previous));
+ }
+ }
+
+ /**
+ * Retains minimum empty cells needed during minor compaction to not
loose data/partial row expiry
+ * on next major compaction.
+ * @param emptyColumn Empty column cells in decreasing order of
timestamp.
+ * @param retainedCells Cells to be retained.
+ */
+ private void retainEmptyCellsInMinorCompaction(List<Cell> emptyColumn,
List<Cell> retainedCells) {
+ if (emptyColumn.isEmpty()) {
+ return;
+ }
+ else if (familyCount == 1 || localIndex) {
+ // We are compacting empty column family store and its single
column family so
+ // just need to retain empty cells till min timestamp of last
row version. Can't
+ // minimize the retained empty cells further as we don't know
actual TTL during
+ // minor compactions.
+ long minRowTimestamp = rowContext.minTimestamp;
+ for (Cell emptyCell: emptyColumn) {
+ if (emptyCell.getTimestamp() > minRowTimestamp) {
+ retainedCells.add(emptyCell);
+ }
+ }
+ return;
+ }
+ // For multi-column family, w/o doing region level scan we can't
put a bound on timestamp
+ // till which we should retain the empty cells. The empty cells
can be needed to close the gap
+ // b/w empty column family cell and non-empty column family cell.
+ retainedCells.addAll(emptyColumn);
}
/**
@@ -2271,7 +2318,8 @@ public class CompactionScanner implements InternalScanner
{
}
}
- if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis
+ ttl) {
+ if (major && compactionTime - rowContext.maxTimestamp >
maxLookbackInMillis + ttl) {
+ // Only do this check for major compaction as for minor
compactions we don't expire cells.
// The row version should not be visible via the max lookback
window. Nothing to do
return;
}
@@ -2280,7 +2328,9 @@ public class CompactionScanner implements InternalScanner
{
// mutation will be considered expired and masked. If the length
of the time range of
// a row version is not more than ttl, then we know the cells
covered by the row
// version are not apart from each other more than ttl and will
not be masked.
- if (rowContext.maxTimestamp - rowContext.minTimestamp <= ttl) {
+ if (major && rowContext.maxTimestamp - rowContext.minTimestamp <=
ttl) {
+ // Skip this check for minor compactions as we don't compute
actual TTL for
+ // minor compactions and don't expire cells.
return;
}
// The quick time range check did not pass. We need get at least
one empty cell to cover
@@ -2288,6 +2338,10 @@ public class CompactionScanner implements
InternalScanner {
if (emptyColumn.isEmpty()) {
return;
}
+ else if (! major) {
+ retainEmptyCellsInMinorCompaction(emptyColumn, retainedCells);
+ return;
+ }
int size = lastRow.size();
long tsArray[] = new long[size];
int i = 0;
@@ -2392,7 +2446,7 @@ public class CompactionScanner implements InternalScanner
{
}
lastRowVersion = trimmedRow;
trimmedEmptyColumn.clear();;
- for (Cell cell : lastRowVersion) {
+ for (Cell cell : emptyColumn) {
if (cell.getTimestamp() >= minTimestamp) {
trimmedEmptyColumn.add(cell);
}
@@ -2425,7 +2479,14 @@ public class CompactionScanner implements
InternalScanner {
}
phoenixResult.clear();
rowTracker.setTTL(result.get(0));
- if (!retainCellsForMaxLookback(result, regionLevel,
phoenixResult)) {
+ // For multi-CF case, always do region level scan for empty CF
store during major compaction else
+ // we could end-up removing some empty cells which are needed to
close the gap b/w empty CF cell and
+ // non-empty CF cell to prevent partial row expiry. This can
happen when last row version of non-empty
+ // CF cell outside max lookback window is older than last row
version of empty CF cell.
+ if (major && familyCount > 1 && ! localIndex && emptyCFStore && !
regionLevel) {
+ compactRegionLevel(result, phoenixResult);
+ }
+ else if (!retainCellsForMaxLookback(result, regionLevel,
phoenixResult)) {
if (familyCount == 1 || regionLevel) {
throw new RuntimeException("UNEXPECTED");
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 6ae12454df..e0253b7290 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -179,25 +179,6 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
private Configuration indexWriteConfig;
private ReadOnlyProps indexWriteProps;
- private static Map<String, Long> maxLookbackMap = new
ConcurrentHashMap<>();
-
- public static void setMaxLookbackInMillis(String tableName, long
maxLookbackInMillis) {
- if (tableName == null) {
- return;
- }
- maxLookbackMap.put(tableName,
- maxLookbackInMillis);
- }
-
- public static long getMaxLookbackInMillis(String tableName, long
maxLookbackInMillis) {
- if (tableName == null) {
- return maxLookbackInMillis;
- }
- Long value = maxLookbackMap.get(tableName);
- return value == null
- ? maxLookbackInMillis
- : maxLookbackMap.get(tableName);
- }
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
@@ -600,30 +581,6 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
region.getTableDescriptor().getTableName().getName()) == 0);
}
- @Override
- public InternalScanner
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
- InternalScanner scanner, FlushLifeCycleTracker tracker) throws
IOException {
- if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
- return scanner;
- } else {
- return User.runAsLoginUser(new
PrivilegedExceptionAction<InternalScanner>() {
- @Override public InternalScanner run() throws Exception {
- String tableName =
c.getEnvironment().getRegion().getRegionInfo().getTable()
- .getNameAsString();
- long maxLookbackInMillis =
-
UngroupedAggregateRegionObserver.getMaxLookbackInMillis(
- tableName,
-
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(
-
c.getEnvironment().getConfiguration()));
- maxLookbackInMillis =
CompactionScanner.getMaxLookbackInMillis(tableName,
- store.getColumnFamilyName(), maxLookbackInMillis);
- return new CompactionScanner(c.getEnvironment(), store,
scanner,
- maxLookbackInMillis, false, true, null);
- }
- });
- }
- }
-
@Override
public InternalScanner
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType
scanType, CompactionLifeCycleTracker tracker,
@@ -653,11 +610,6 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
compactionConfig).unwrap(PhoenixConnection.class)) {
table = conn.getTableNoCache(fullTableName);
maxLookbackAge = table.getMaxLookbackAge();
- UngroupedAggregateRegionObserver.setMaxLookbackInMillis(
- tableName.getNameAsString(),
- MetaDataUtil.getMaxLookbackAge(
- c.getEnvironment().getConfiguration(),
- maxLookbackAge));
} catch (Exception e) {
if (e instanceof TableNotFoundException) {
LOGGER.debug("Ignoring HBase table that is not a
Phoenix table: "
@@ -705,6 +657,10 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
keepDeleted, table
);
}
+ else if
(isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
+ LOGGER.warn("Skipping compaction for table: {} " +
+ "as failed to retrieve PTable object",
fullTableName);
+ }
if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
try {
long clientTimeStamp =
EnvironmentEdgeManager.currentTimeMillis();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index 1e2fa2752c..843f780e34 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -65,6 +65,7 @@ import static
org.apache.phoenix.util.TestUtil.assertTableHasTtl;
import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@Category(NeedsOwnMiniClusterTest.class)
@@ -91,6 +92,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(MAX_LOOKBACK_AGE));
props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
+ props.put(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, "0");
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -615,6 +617,57 @@ public class MaxLookbackExtendedIT extends BaseTest {
}
}
}
+
+ @Test(timeout=60000)
+ public void testRetainingLastRowVersion() throws Exception {
+ if(hasTableLevelMaxLookback) {
+ optionBuilder.append(", MAX_LOOKBACK_AGE=" +
TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+ tableDDLOptions = optionBuilder.toString();
+ }
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String tableName = generateUniqueName();
+ createTable(tableName);
+ long timeIntervalBetweenTwoUpserts = (ttl / 2) + 1;
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ TableName dataTableName = TableName.valueOf(tableName);
+ injectEdge.incrementValue(1);
+ Statement stmt = conn.createStatement();
+ stmt.execute("upsert into " + tableName + " values ('a', 'ab',
'abc', 'abcd')");
+ conn.commit();
+ injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+ flush(dataTableName);
+ injectEdge.incrementValue(1);
+ stmt.execute("upsert into " + tableName + " values ('a', 'ab1')");
+ conn.commit();
+ injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+ flush(dataTableName);
+ injectEdge.incrementValue(1);
+ stmt.execute("upsert into " + tableName + " values ('a', 'ab2')");
+ conn.commit();
+ injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+ flush(dataTableName);
+ injectEdge.incrementValue(1);
+ stmt.execute("upsert into " + tableName + " values ('a', 'ab3')");
+ conn.commit();
+ injectEdge.incrementValue(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+ flush(dataTableName);
+ injectEdge.incrementValue(1);
+ TestUtil.dumpTable(conn, dataTableName);
+ TestUtil.minorCompact(utility, dataTableName);
+ injectEdge.incrementValue(1);
+ TestUtil.dumpTable(conn, dataTableName);
+ majorCompact(dataTableName);
+ injectEdge.incrementValue(1);
+ TestUtil.dumpTable(conn, dataTableName);
+ ResultSet rs = stmt.executeQuery("select * from " + dataTableName
+ " where id = 'a'");
+ while(rs.next()) {
+ assertNotNull(rs.getString(3));
+ assertNotNull(rs.getString(4));
+ }
+ }
+ }
+
private void flush(TableName table) throws IOException {
Admin admin = getUtility().getAdmin();
admin.flush(table);
@@ -684,5 +737,4 @@ public class MaxLookbackExtendedIT extends BaseTest {
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
IndexToolIT.assertExplainPlan(false, actualExplainPlan,
dataTableFullName, indexTableFullName);
}
-
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index e2aa329f3d..f7188e8006 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -52,6 +52,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category(NeedsOwnMiniClusterTest.class)
@@ -172,6 +173,7 @@ public class TableTTLIT extends BaseTest {
conn.commit();
String noCompactTableName = generateUniqueName();
createTable(noCompactTableName);
+ conn.createStatement().execute("ALTER TABLE " + noCompactTableName
+ " set MAX_LOOKBACK_AGE = " + maxLookbackAge * 1000);
conn.commit();
long startTime = System.currentTimeMillis() + 1000;
startTime = (startTime / 1000) * 1000;
@@ -245,7 +247,7 @@ public class TableTTLIT extends BaseTest {
}
@Test
- public void
testFlushesAndMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
+ public void
testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
throws Exception {
final int maxLookbackAge = tableLevelMaxLooback != null
? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
@@ -255,36 +257,29 @@ public class TableTTLIT extends BaseTest {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String tableName = generateUniqueName();
createTable(tableName);
- conn.createStatement().execute("Alter Table " + tableName + " set
\"phoenix.max.lookback.age.seconds\" = 0");
conn.commit();
final int flushCount = 10;
byte[] row = Bytes.toBytes("a");
+ int rowUpdateCounter = 0;
for (int i = 0; i < flushCount; i++) {
// Generate more row versions than the maximum cell versions
for the table
int updateCount = RAND.nextInt(10) + versions;
+ rowUpdateCounter += updateCount;
for (int j = 0; j < updateCount; j++) {
updateRow(conn, tableName, "a");
}
flush(TableName.valueOf(tableName));
- // At every flush, extra cell versions should be removed.
- // MAX_COLUMN_INDEX table columns and one empty column will be
retained for
- // each row version.
- assertTrue(TestUtil.getRawCellCount(conn,
TableName.valueOf(tableName), row)
- <= (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
+ // Flushes dump and retain all the cells to HFile.
+ // Doing MAX_COLUMN_INDEX + 1 to account for empty cells
+ assertEquals(TestUtil.getRawCellCount(conn,
TableName.valueOf(tableName), row),
+ rowUpdateCounter * (MAX_COLUMN_INDEX + 1));
}
+ TestUtil.dumpTable(conn, TableName.valueOf(tableName));
// Run one minor compaction (in case no minor compaction has
happened yet)
- Admin admin = utility.getAdmin();
- admin.compact(TableName.valueOf(tableName));
- int waitCount = 0;
- while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
- Bytes.toBytes("a")) >= flushCount * (MAX_COLUMN_INDEX + 1)
* versions) {
- // Wait for minor compactions to happen
- Thread.sleep(1000);
- waitCount++;
- if (waitCount > 30) {
- Assert.fail();
- }
- }
+ TestUtil.minorCompact(utility, TableName.valueOf(tableName));
+ TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+ assertEquals(TestUtil.getRawCellCount(conn,
TableName.valueOf(tableName), Bytes.toBytes("a")),
+ (MAX_COLUMN_INDEX + 1) * versions);
}
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c071129dcb..1845f01a62 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -98,6 +98,7 @@ import org.apache.phoenix.compile.StatementNormalizer;
import org.apache.phoenix.compile.SubqueryRewriter;
import org.apache.phoenix.compile.SubselectRewriter;
import org.apache.phoenix.compile.JoinCompiler.JoinTable;
+import org.apache.phoenix.coprocessor.CompactionScanner;
import
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
import
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
@@ -815,6 +816,26 @@ public class TestUtil {
admin.flush(table);
}
+ public static void minorCompact(HBaseTestingUtility utility, TableName
table)
+ throws IOException, InterruptedException {
+ try {
+ CompactionScanner.setForceMinorCompaction(true);
+ Admin admin = utility.getAdmin();
+ admin.compact(table);
+ int waitForCompactionToCompleteCounter = 0;
+ while (CompactionScanner.getForceMinorCompaction()) {
+ waitForCompactionToCompleteCounter++;
+ if (waitForCompactionToCompleteCounter > 20) {
+ Assert.fail();
+ }
+ Thread.sleep(1000);
+ }
+ }
+ finally {
+ CompactionScanner.setForceMinorCompaction(false);
+ }
+ }
+
public static void majorCompact(HBaseTestingUtility utility, TableName
table)
throws IOException, InterruptedException {
long compactionRequestedSCN =
EnvironmentEdgeManager.currentTimeMillis();