This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 52d58f9891 PHOENIX-7402: Fix row getting expired partially even if
updated within TTL (#1986)
52d58f9891 is described below
commit 52d58f9891c319347571fa1e5f834692d68cf7e2
Author: sanjeet006py <[email protected]>
AuthorDate: Thu Sep 26 03:21:46 2024 +0530
PHOENIX-7402: Fix row getting expired partially even if updated within TTL
(#1986)
* PHOENIX-7402: Fix row getting expired partially even if updated within TTL
* Increase thread sleep while waiting for minor compaction to complete
---------
Co-authored-by: Sanjeet Malhotra <[email protected]>
---
.../phoenix/coprocessor/CompactionScanner.java | 131 +++++++++++++++------
.../UngroupedAggregateRegionObserver.java | 25 +---
.../phoenix/end2end/MaxLookbackExtendedIT.java | 56 +++++++++
.../org/apache/phoenix/end2end/TableTTLIT.java | 38 +++---
.../java/org/apache/phoenix/util/TestUtil.java | 21 ++++
5 files changed, 196 insertions(+), 75 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 2fcf91dc64..64bebca166 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import static
org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
@@ -103,6 +104,9 @@ public class CompactionScanner implements InternalScanner {
private long outputCellCount = 0;
private boolean phoenixLevelOnly = false;
+ // Only for forcing minor compaction while testing
+ private static boolean forceMinorCompaction = false;
+
public CompactionScanner(RegionCoprocessorEnvironment env,
Store store,
InternalScanner storeScanner,
@@ -115,6 +119,9 @@ public class CompactionScanner implements InternalScanner {
this.region = env.getRegion();
this.store = store;
this.env = env;
+ // Empty column family and qualifier are always needed to compute
which all empty cells to retain
+ // even during minor compactions. If required empty cells are not
retained during
+ // minor compactions then we can run into the risk of partial row
expiry on next major compaction.
this.emptyCF = emptyCF;
this.emptyCQ = emptyCQ;
this.config = env.getConfiguration();
@@ -131,8 +138,8 @@ public class CompactionScanner implements InternalScanner {
this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ?
compactionTime : compactionTime - (this.maxLookbackInMillis +
1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
- this.major = major;
- int ttl = major ? cfd.getTimeToLive() : HConstants.FOREVER;
+ this.major = major && ! forceMinorCompaction;
+ int ttl = this.major ? cfd.getTimeToLive() : HConstants.FOREVER;
ttlInMillis = ((long) ttl) * 1000;
this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime -
ttlInMillis;
this.maxLookbackWindowStart = Math.max(ttlWindowStart,
maxLookbackWindowStart);
@@ -141,17 +148,25 @@ public class CompactionScanner implements InternalScanner
{
this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL :
cfd.getKeepDeletedCells();
familyCount = region.getTableDescriptor().getColumnFamilies().length;
localIndex =
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
- emptyCFStore = major
- ? familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
- || localIndex
- : true; // we do not need to identify emptyCFStore for minor
compaction or flushes
+ emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
+ || localIndex;
phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
LOGGER.info("Starting CompactionScanner for table " + tableName + "
store "
- + columnFamilyName + (major ? " major " : " not major ") +
"compaction ttl "
+ + columnFamilyName + (this.major ? " major " : " not major ")
+ "compaction ttl "
+ ttlInMillis + "ms " + "max lookback " +
this.maxLookbackInMillis + "ms");
}
+ @VisibleForTesting
+ public static void setForceMinorCompaction(boolean doMinorCompaction) {
+ forceMinorCompaction = doMinorCompaction;
+ }
+
+ @VisibleForTesting
+ public static boolean getForceMinorCompaction() {
+ return forceMinorCompaction;
+ }
+
/**
* Any coprocessors within a JVM can extend the max lookback window for a
column family
* by calling this static method.
@@ -243,6 +258,9 @@ public class CompactionScanner implements InternalScanner {
+ columnFamilyName + (major ? " major " : " not major ") +
"compaction retained "
+ outputCellCount + " of " + inputCellCount + " cells"
+ (phoenixLevelOnly ? " phoenix level only" : ""));
+ if (forceMinorCompaction) {
+ forceMinorCompaction = false;
+ }
storeScanner.close();
}
@@ -674,13 +692,9 @@ public class CompactionScanner implements InternalScanner {
List<Cell> lastRowVersion, List<Cell> retainedCells,
List<Cell> emptyColumn) {
Cell currentColumnCell = null;
boolean isEmptyColumn = false;
- Cell cellAtMaxLookbackWindowStart = null;
for (Cell cell : result) {
- if (cell.getTimestamp() >= maxLookbackWindowStart) {
+ if (cell.getTimestamp() > maxLookbackWindowStart) {
retainedCells.add(cell);
- if (cell.getTimestamp() == maxLookbackWindowStart) {
- cellAtMaxLookbackWindowStart = cell;
- }
continue;
}
if (!major && cell.getType() != Cell.Type.Put) {
@@ -689,26 +703,19 @@ public class CompactionScanner implements InternalScanner
{
if (currentColumnCell == null ||
!CellUtil.matchingColumn(cell, currentColumnCell)) {
currentColumnCell = cell;
- if (cell.getType() != Cell.Type.Delete
- && cell.getType() != Cell.Type.DeleteColumn) {
- // Include only delete family markers and put cells
- // It is possible that this cell is not visible from
the max lookback
- // window. This happens when there is a mutation with
the mutation timestamp
- // equal to the max lookback window start timestamp.
The following is to
- // check for this case
- if (cellAtMaxLookbackWindowStart == null
- || !CellUtil.matchingColumn(cell,
cellAtMaxLookbackWindowStart)) {
- lastRowVersion.add(cell);
- }
- }
- if (major && ScanUtil.isEmptyColumn(cell, emptyCF,
emptyCQ)) {
- isEmptyColumn = true;
- } else {
- isEmptyColumn = false;
+ isEmptyColumn = ScanUtil.isEmptyColumn(cell, emptyCF,
emptyCQ);
+ if ((cell.getType() != Cell.Type.Delete
+ && cell.getType() != Cell.Type.DeleteColumn)
+ || cell.getTimestamp() == maxLookbackWindowStart) {
+ // Include only delete family markers and put cells or
the
+ // cells at start edge of max lookback window
+ lastRowVersion.add(cell);
}
- } else if (major && isEmptyColumn) {
+ } else if (isEmptyColumn) {
// We only need to keep one cell for every column for the
last row version.
- // So here we just form the empty column beyond the last
row version
+ // So here we just form the empty column beyond the last
row version.
+ // Empty column needs to be collected during minor
compactions also
+ // else we will see partial row expiry.
emptyColumn.add(cell);
}
}
@@ -718,7 +725,7 @@ public class CompactionScanner implements InternalScanner {
* Close the gap between the two timestamps, max and min, with the
minimum number of cells
* from the input list such that the timestamp difference between two
cells should
* not more than ttl. The cells that are used to close the gap are
added to the output
- * list.
+ * list. The input list is a list of empty cells in decreasing order
of timestamp.
*/
private void closeGap(long max, long min, List<Cell> input, List<Cell>
output) {
int previous = -1;
@@ -729,7 +736,9 @@ public class CompactionScanner implements InternalScanner {
previous++;
continue;
}
- if (previous == -1) {
+ if (previous == -1 && max - ts > ttlInMillis) {
+ // Means even the first empty cells in the input list
which is closest to
+ // max timestamp can't close the gap. So, gap can't be
closed by empty cells at all.
break;
}
if (max - ts > ttlInMillis) {
@@ -742,6 +751,42 @@ public class CompactionScanner implements InternalScanner {
}
previous++;
}
+ if (previous > -1 && max - min > ttlInMillis) {
+ // This covers the case we need to retain the last empty cell
in the input list. The close gap
+ // algorithm is such that if we need to retain the i th empty
cell in the input list then we
+ // will get to know that once we are iterating on i+1 th empty
cell. So, to retain last empty cell
+ // in input list we need to check the min timestamp.
+ output.add(input.remove(previous));
+ }
+ }
+
+ /**
+ * Retains minimum empty cells needed during minor compaction to not
loose data/partial row expiry
+ * on next major compaction.
+ * @param emptyColumn Empty column cells in decreasing order of
timestamp.
+ * @param retainedCells Cells to be retained.
+ */
+ private void retainEmptyCellsInMinorCompaction(List<Cell> emptyColumn,
List<Cell> retainedCells) {
+ if (emptyColumn.isEmpty()) {
+ return;
+ }
+ else if (familyCount == 1 || localIndex) {
+ // We are compacting empty column family store and its single
column family so
+ // just need to retain empty cells till min timestamp of last
row version. Can't
+ // minimize the retained empty cells further as we don't know
actual TTL during
+ // minor compactions.
+ long minRowTimestamp = rowContext.minTimestamp;
+ for (Cell emptyCell: emptyColumn) {
+ if (emptyCell.getTimestamp() > minRowTimestamp) {
+ retainedCells.add(emptyCell);
+ }
+ }
+ return;
+ }
+ // For multi-column family, w/o doing region level scan we can't
put a bound on timestamp
+ // till which we should retain the empty cells. The empty cells
can be needed to close the gap
+ // b/w empty column family cell and non-empty column family cell.
+ retainedCells.addAll(emptyColumn);
}
/**
@@ -764,7 +809,8 @@ public class CompactionScanner implements InternalScanner {
}
}
- if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis
+ ttlInMillis) {
+ if (major && compactionTime - rowContext.maxTimestamp >
maxLookbackInMillis + ttlInMillis) {
+ // Only do this check for major compaction as for minor
compactions we don't expire cells.
// The row version should not be visible via the max lookback
window. Nothing to do
return;
}
@@ -773,7 +819,9 @@ public class CompactionScanner implements InternalScanner {
// mutation will be considered expired and masked. If the length
of the time range of
// a row version is not more than ttl, then we know the cells
covered by the row
// version are not apart from each other more than ttl and will
not be masked.
- if (rowContext.maxTimestamp - rowContext.minTimestamp <=
ttlInMillis) {
+ if (major && rowContext.maxTimestamp - rowContext.minTimestamp <=
ttlInMillis) {
+ // Skip this check for minor compactions as we don't compute
actual TTL for
+ // minor compactions and don't expire cells.
return;
}
// The quick time range check did not pass. We need get at least
one empty cell to cover
@@ -781,6 +829,10 @@ public class CompactionScanner implements InternalScanner {
if (emptyColumn.isEmpty()) {
return;
}
+ else if (! major) {
+ retainEmptyCellsInMinorCompaction(emptyColumn, retainedCells);
+ return;
+ }
int size = lastRow.size();
long tsArray[] = new long[size];
int i = 0;
@@ -884,7 +936,7 @@ public class CompactionScanner implements InternalScanner {
}
lastRowVersion = trimmedRow;
trimmedEmptyColumn.clear();;
- for (Cell cell : lastRowVersion) {
+ for (Cell cell : emptyColumn) {
if (cell.getTimestamp() >= minTimestamp) {
trimmedEmptyColumn.add(cell);
}
@@ -916,7 +968,14 @@ public class CompactionScanner implements InternalScanner {
return;
}
phoenixResult.clear();
- if (!retainCellsForMaxLookback(result, regionLevel,
phoenixResult)) {
+ // For multi-CF case, always do region level scan for empty CF
store during major compaction else
+ // we could end-up removing some empty cells which are needed to
close the gap b/w empty CF cell and
+ // non-empty CF cell to prevent partial row expiry. This can
happen when last row version of non-empty
+ // CF cell outside max lookback window is older than last row
version of empty CF cell.
+ if (major && familyCount > 1 && ! localIndex && emptyCFStore && !
regionLevel) {
+ compactRegionLevel(result, phoenixResult);
+ }
+ else if (!retainCellsForMaxLookback(result, regionLevel,
phoenixResult)) {
if (familyCount == 1 || regionLevel) {
throw new RuntimeException("UNEXPECTED");
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index eda44eb00a..b0b1d4196d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -581,27 +581,6 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
region.getTableDescriptor().getTableName().getName()) == 0);
}
- @Override
- public InternalScanner
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
- InternalScanner scanner, FlushLifeCycleTracker tracker) throws
IOException {
- if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
- return scanner;
- } else {
- return User.runAsLoginUser(new
PrivilegedExceptionAction<InternalScanner>() {
- @Override public InternalScanner run() throws Exception {
- String tableName =
c.getEnvironment().getRegion().getRegionInfo().getTable()
- .getNameAsString();
- long maxLookbackInMillis =
BaseScannerRegionObserverConstants
-
.getMaxLookbackInMillis(c.getEnvironment().getConfiguration());
- maxLookbackInMillis =
CompactionScanner.getMaxLookbackInMillis(tableName,
- store.getColumnFamilyName(), maxLookbackInMillis);
- return new CompactionScanner(c.getEnvironment(), store,
scanner,
- maxLookbackInMillis, null, null, false, true);
- }
- });
- }
- }
-
@Override
public InternalScanner
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType
scanType, CompactionLifeCycleTracker tracker,
@@ -679,6 +658,10 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
request.isMajor() || request.isAllFiles(),
keepDeleted
);
}
+ else if
(isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
+ LOGGER.warn("Skipping compaction for table: {} " +
+ "as failed to retrieve PTable object",
fullTableName);
+ }
if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
try {
long clientTimeStamp =
EnvironmentEdgeManager.currentTimeMillis();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index 8ade4e560e..b9984bc970 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -66,6 +66,7 @@ import static
org.apache.phoenix.util.TestUtil.assertTableHasVersions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
@@ -88,6 +89,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(MAX_LOOKBACK_AGE));
props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
+ props.put(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, "0");
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -560,6 +562,60 @@ public class MaxLookbackExtendedIT extends BaseTest {
}
}
}
+
+ @Test(timeout=60000)
+ public void testRetainingLastRowVersion() throws Exception {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String tableName = generateUniqueName();
+ createTable(tableName);
+ long timeIntervalBetweenTwoUpserts = (ttl / 4) + 1;
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ TableName dataTableName = TableName.valueOf(tableName);
+ injectEdge.incrementValue(1);
+ Statement stmt = conn.createStatement();
+ stmt.execute("upsert into " + tableName + " values ('a', 'ab',
'abc', 'abcd')");
+ conn.commit();
+ injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+ flush(dataTableName);
+ injectEdge.incrementValue(1);
+ stmt.execute("upsert into " + tableName + " values ('a', 'ab1')");
+ conn.commit();
+ injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+ flush(dataTableName);
+ injectEdge.incrementValue(1);
+ stmt.execute("upsert into " + tableName + " values ('a', 'ab2')");
+ conn.commit();
+ injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+ flush(dataTableName);
+ injectEdge.incrementValue(1);
+ TestUtil.minorCompact(utility, dataTableName);
+ injectEdge.incrementValue(1);
+ stmt.execute("upsert into " + tableName + " values ('a', 'ab3')");
+ conn.commit();
+ injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+ flush(dataTableName);
+ injectEdge.incrementValue(1);
+ stmt.execute("upsert into " + tableName + " values ('a', 'ab4')");
+ conn.commit();
+ injectEdge.incrementValue(1);
+ flush(dataTableName);
+ TestUtil.dumpTable(conn, dataTableName);
+ injectEdge.incrementValue(1);
+ TestUtil.minorCompact(utility, dataTableName);
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 - 1);
+ TestUtil.dumpTable(conn, dataTableName);
+ majorCompact(dataTableName);
+ injectEdge.incrementValue(1);
+ TestUtil.dumpTable(conn, dataTableName);
+ ResultSet rs = stmt.executeQuery("select * from " + dataTableName
+ " where id = 'a'");
+ while(rs.next()) {
+ assertNotNull(rs.getString(3));
+ assertNotNull(rs.getString(4));
+ }
+ }
+ }
+
private void flush(TableName table) throws IOException {
Admin admin = getUtility().getAdmin();
admin.flush(table);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 43a8ab1a60..11819772fe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -53,6 +53,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category(NeedsOwnMiniClusterTest.class)
@@ -236,7 +237,7 @@ public class TableTTLIT extends BaseTest {
}
@Test
- public void
testFlushesAndMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
+ public void
testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String tableName = generateUniqueName();
@@ -245,32 +246,33 @@ public class TableTTLIT extends BaseTest {
conn.commit();
final int flushCount = 10;
byte[] row = Bytes.toBytes("a");
+ int rowUpdateCounter = 0;
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.incrementValue(1);
for (int i = 0; i < flushCount; i++) {
// Generate more row versions than the maximum cell versions
for the table
int updateCount = RAND.nextInt(10) + versions;
+ rowUpdateCounter += updateCount;
for (int j = 0; j < updateCount; j++) {
updateRow(conn, tableName, "a");
+ injectEdge.incrementValue(1);
}
flush(TableName.valueOf(tableName));
- // At every flush, extra cell versions should be removed.
- // MAX_COLUMN_INDEX table columns and one empty column will be
retained for
- // each row version.
- assertTrue(TestUtil.getRawCellCount(conn,
TableName.valueOf(tableName), row)
- <= (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
+ injectEdge.incrementValue(1);
+ // Flushes dump and retain all the cells to HFile.
+ // Doing MAX_COLUMN_INDEX + 1 to account for empty cells
+ assertEquals(TestUtil.getRawCellCount(conn,
TableName.valueOf(tableName), row),
+ rowUpdateCounter * (MAX_COLUMN_INDEX + 1));
}
+ TestUtil.dumpTable(conn, TableName.valueOf(tableName));
// Run one minor compaction (in case no minor compaction has
happened yet)
- Admin admin = utility.getAdmin();
- admin.compact(TableName.valueOf(tableName));
- int waitCount = 0;
- while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
- Bytes.toBytes("a")) >= flushCount * (MAX_COLUMN_INDEX + 1)
* versions) {
- // Wait for minor compactions to happen
- Thread.sleep(1000);
- waitCount++;
- if (waitCount > 120) {
- Assert.fail();
- }
- }
+ TestUtil.minorCompact(utility, TableName.valueOf(tableName));
+ injectEdge.incrementValue(1);
+ TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+ // For multi-CF table we retain all empty cells during minor
compaction
+ int retainedCellCount = (MAX_COLUMN_INDEX + 1) * versions +
(multiCF ? rowUpdateCounter - versions : 0);
+ assertEquals(retainedCellCount, TestUtil.getRawCellCount(conn,
TableName.valueOf(tableName), Bytes.toBytes("a")));
}
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 2e5cc1faf3..8bea1194eb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -98,6 +98,7 @@ import org.apache.phoenix.compile.StatementNormalizer;
import org.apache.phoenix.compile.SubqueryRewriter;
import org.apache.phoenix.compile.SubselectRewriter;
import org.apache.phoenix.compile.JoinCompiler.JoinTable;
+import org.apache.phoenix.coprocessor.CompactionScanner;
import
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
import
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
@@ -815,6 +816,26 @@ public class TestUtil {
admin.flush(table);
}
+ public static void minorCompact(HBaseTestingUtility utility, TableName
table)
+ throws IOException, InterruptedException {
+ try {
+ CompactionScanner.setForceMinorCompaction(true);
+ Admin admin = utility.getAdmin();
+ admin.compact(table);
+ int waitForCompactionToCompleteCounter = 0;
+ while (CompactionScanner.getForceMinorCompaction()) {
+ waitForCompactionToCompleteCounter++;
+ if (waitForCompactionToCompleteCounter > 50) {
+ Assert.fail();
+ }
+ Thread.sleep(3000);
+ }
+ }
+ finally {
+ CompactionScanner.setForceMinorCompaction(false);
+ }
+ }
+
public static void majorCompact(HBaseTestingUtility utility, TableName
table)
throws IOException, InterruptedException {
long compactionRequestedSCN =
EnvironmentEdgeManager.currentTimeMillis();