This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new d77f473100 PHOENIX-7314 Enable CompactionScanner for flushes and minor
compaction (#1896)
d77f473100 is described below
commit d77f4731005c5a8ffb593c7c0affcad9903bb4ce
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Fri Jun 7 23:00:24 2024 +0300
PHOENIX-7314 Enable CompactionScanner for flushes and minor compaction
(#1896)
---
.../coprocessor/BaseScannerRegionObserver.java | 48 +--
.../phoenix/coprocessor/CompactionScanner.java | 462 ++++++++++++++-------
.../UngroupedAggregateRegionObserver.java | 219 ++++++----
.../phoenix/end2end/MaxLookbackExtendedIT.java | 44 +-
.../org/apache/phoenix/end2end/TableTTLIT.java | 41 +-
.../java/org/apache/phoenix/util/TestUtil.java | 1 -
6 files changed, 516 insertions(+), 299 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d27a187dd1..b25acfb72c 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -357,21 +357,12 @@ abstract public class BaseScannerRegionObserver
implements RegionObserver {
dataRegion, indexMaintainer, null, viewConstants, null, null,
projector, ptr, useQualiferAsListIndex);
}
- public void setScanOptionsForFlushesAndCompactions(Store store,
ScanOptions options,
- boolean retainAllVersions) {
+ public void setScanOptionsForFlushesAndCompactions(ScanOptions options) {
// We want the store to give us all the deleted cells to
StoreCompactionScanner
options.setKeepDeletedCells(KeepDeletedCells.TTL);
options.setTTL(HConstants.FOREVER);
- if (retainAllVersions) {
- options.setMaxVersions(Integer.MAX_VALUE);
- options.setMinVersions(Integer.MAX_VALUE);
- } else {
- options.setMinVersions(Math.max(Math.max(options.getMaxVersions(),
- store.getColumnFamilyDescriptor().getMaxVersions()), 1));
- options.setMinVersions(Math.max(Math.max(options.getMinVersions(),
- store.getColumnFamilyDescriptor().getMaxVersions()), 1));
- }
-
+ options.setMaxVersions(Integer.MAX_VALUE);
+ options.setMinVersions(Integer.MAX_VALUE);
}
@Override
@@ -380,16 +371,13 @@ abstract public class BaseScannerRegionObserver
implements RegionObserver {
CompactionRequest request) throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
- boolean retainAllVersions = isMaxLookbackTimeEnabled(
-
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf))
- || request.isMajor();
- setScanOptionsForFlushesAndCompactions(store, options,
retainAllVersions);
+ setScanOptionsForFlushesAndCompactions(options);
return;
}
- long maxLookbackAge = getMaxLookbackAge(c);
- if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
+ long maxLookbackAgeInMillis = getMaxLookbackAge(c);
+ if (isMaxLookbackTimeEnabled(maxLookbackAgeInMillis)) {
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options,
store,
- scanType, maxLookbackAge);
+ scanType, maxLookbackAgeInMillis);
}
}
@@ -399,16 +387,14 @@ abstract public class BaseScannerRegionObserver
implements RegionObserver {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
- boolean retainAllVersions = isMaxLookbackTimeEnabled(
-
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
- setScanOptionsForFlushesAndCompactions(store, options,
retainAllVersions);
+ setScanOptionsForFlushesAndCompactions(options);
return;
}
- long maxLookbackAge = getMaxLookbackAge(c);
- if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
+ long maxLookbackAgeInMillis = getMaxLookbackAge(c);
+ if (isMaxLookbackTimeEnabled(maxLookbackAgeInMillis)) {
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options,
store,
- ScanType.COMPACT_RETAIN_DELETES, maxLookbackAge);
+ ScanType.COMPACT_RETAIN_DELETES, maxLookbackAgeInMillis);
}
}
@@ -418,13 +404,11 @@ abstract public class BaseScannerRegionObserver
implements RegionObserver {
throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
- boolean retainAllVersions = isMaxLookbackTimeEnabled(
-
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
- setScanOptionsForFlushesAndCompactions(store, options,
retainAllVersions);
+ setScanOptionsForFlushesAndCompactions(options);
return;
}
- long maxLookbackAge = getMaxLookbackAge(c);
- if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
+ long maxLookbackAgeInMillis = getMaxLookbackAge(c);
+ if (isMaxLookbackTimeEnabled(maxLookbackAgeInMillis)) {
MemoryCompactionPolicy inMemPolicy =
store.getColumnFamilyDescriptor().getInMemoryCompaction();
ScanType scanType;
@@ -437,7 +421,7 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
scanType = ScanType.COMPACT_RETAIN_DELETES;
}
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options,
store,
- scanType, maxLookbackAge);
+ scanType, maxLookbackAgeInMillis);
}
}
@@ -447,7 +431,7 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
- setScanOptionsForFlushesAndCompactions(store, options, true);
+ setScanOptionsForFlushesAndCompactions(options);
return;
}
if (!storeFileScanDoesntNeedAlteration(options)) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index ebe92b8741..2fcf91dc64 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -50,9 +51,25 @@ import org.slf4j.LoggerFactory;
import static
org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
/**
- * The store scanner that implements Phoenix TTL and Max Lookback. Phoenix
overrides the
- * HBase implementation of data retention policies which is built at the cell
level, and implements
- * its row level data retention within this store scanner.
+ * The store scanner that implements compaction for Phoenix. Phoenix coproc
overrides the scan
+ * options so that HBase store scanner retains all cells during compaction and
flushes. Then this
+ * store scanner decides which cells to retain. This is required to ensure
rows do not expire
+ * partially and to preserve all cells within Phoenix max lookback window.
+ *
+ * The compaction process is optimized for Phoenix. This optimization assumes
that
+ * . A given delete family or delete family version marker is inserted to all
column families
+ * . A given delete family version marker always delete a full version of a
row. Please note
+ * delete family version markers are used only on index tables where
mutations are always
+ * full row mutations.
+ *
+ * During major compaction, minor compaction and memstore flush, all cells
(and delete markers)
+ * that are visible through the max lookback window are retained. Outside the
max lookback window,
+ * (1) extra put cell versions, (2) delete markers and deleted cells that are
not supposed to be
+ * kept (by the KeepDeletedCell option), and (3) expired cells are removed
during major compaction.
+ * During flushes and minor compaction, expired cells and delete markers are
not removed however
+ * deleted cells that are not supposed to be kept (by the KeepDeletedCell
option) and extra put
+ * cell versions are removed.
+ *
*/
public class CompactionScanner implements InternalScanner {
private static final Logger LOGGER =
LoggerFactory.getLogger(CompactionScanner.class);
@@ -64,7 +81,7 @@ public class CompactionScanner implements InternalScanner {
private final RegionCoprocessorEnvironment env;
private long maxLookbackWindowStart;
private long ttlWindowStart;
- private long ttl;
+ private long ttlInMillis;
private final long maxLookbackInMillis;
private int minVersion;
private int maxVersion;
@@ -81,13 +98,19 @@ public class CompactionScanner implements InternalScanner {
private static Map<String, Long> maxLookbackMap = new
ConcurrentHashMap<>();
private PhoenixLevelRowCompactor phoenixLevelRowCompactor;
private HBaseLevelRowCompactor hBaseLevelRowCompactor;
+ private boolean major;
+ private long inputCellCount = 0;
+ private long outputCellCount = 0;
+ private boolean phoenixLevelOnly = false;
public CompactionScanner(RegionCoprocessorEnvironment env,
Store store,
InternalScanner storeScanner,
long maxLookbackInMillis,
byte[] emptyCF,
- byte[] emptyCQ) {
+ byte[] emptyCQ,
+ boolean major,
+ boolean keepDeleted) {
this.storeScanner = storeScanner;
this.region = env.getRegion();
this.store = store;
@@ -99,8 +122,7 @@ public class CompactionScanner implements InternalScanner {
columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
tableName = region.getRegionInfo().getTable().getNameAsString();
- Long overriddenMaxLookback =
- maxLookbackMap.remove(tableName + SEPARATOR +
columnFamilyName);
+ Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR
+ columnFamilyName);
this.maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackInMillis : Math.max(maxLookbackInMillis,
overriddenMaxLookback);
// The oldest scn is current time - maxLookbackInMillis. Phoenix sets
the scan time range
@@ -109,22 +131,25 @@ public class CompactionScanner implements InternalScanner
{
this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ?
compactionTime : compactionTime - (this.maxLookbackInMillis +
1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
- ttl = cfd.getTimeToLive();
- this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime -
ttl * 1000;
- ttl *= 1000;
+ this.major = major;
+ int ttl = major ? cfd.getTimeToLive() : HConstants.FOREVER;
+ ttlInMillis = ((long) ttl) * 1000;
+ this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime -
ttlInMillis;
this.maxLookbackWindowStart = Math.max(ttlWindowStart,
maxLookbackWindowStart);
this.minVersion = cfd.getMinVersions();
this.maxVersion = cfd.getMaxVersions();
- this.keepDeletedCells = cfd.getKeepDeletedCells();
+ this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL :
cfd.getKeepDeletedCells();
familyCount = region.getTableDescriptor().getColumnFamilies().length;
localIndex =
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
- emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
- || localIndex;
+ emptyCFStore = major
+ ? familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
+ || localIndex
+ : true; // we do not need to identify emptyCFStore for minor
compaction or flushes
phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
- LOGGER.info("Starting Phoenix CompactionScanner for table " +
tableName + " store "
- + columnFamilyName + " ttl " + ttl + "ms " + "max lookback "
- + maxLookbackInMillis + "ms");
+ LOGGER.info("Starting CompactionScanner for table " + tableName + "
store "
+ + columnFamilyName + (major ? " major " : " not major ") +
"compaction ttl "
+ + ttlInMillis + "ms " + "max lookback " +
this.maxLookbackInMillis + "ms");
}
/**
@@ -138,16 +163,71 @@ public class CompactionScanner implements InternalScanner
{
}
Long old = maxLookbackMap.putIfAbsent(tableName + SEPARATOR +
columnFamilyName,
maxLookbackInMillis);
- if (old != null && old < maxLookbackInMillis) {
+ if (old != null) {
maxLookbackMap.put(tableName + SEPARATOR + columnFamilyName,
maxLookbackInMillis);
}
}
+ public static long getMaxLookbackInMillis(String tableName, String
columnFamilyName,
+ long maxLookbackInMillis) {
+ if (tableName == null || columnFamilyName == null) {
+ return maxLookbackInMillis;
+ }
+ Long value = maxLookbackMap.get(tableName +
CompactionScanner.SEPARATOR + columnFamilyName);
+ return value == null
+ ? maxLookbackInMillis
+ : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR +
columnFamilyName);
+ }
+ static class CellTimeComparator implements Comparator<Cell> {
+ public static final CellTimeComparator COMPARATOR = new
CellTimeComparator();
+ @Override public int compare(Cell o1, Cell o2) {
+ long ts1 = o1.getTimestamp();
+ long ts2 = o2.getTimestamp();
+ if (ts1 == ts2) return 0;
+ if (ts1 > ts2) return -1;
+ return 1;
+ }
+
+ @Override public boolean equals(Object obj) {
+ return false;
+ }
+ }
+ private void printRow(List<Cell> result, String title, boolean sort) {
+ List<Cell> row;
+ if (sort) {
+ row = new ArrayList<>(result);
+ Collections.sort(row, CellTimeComparator.COMPARATOR);
+ } else {
+ row = result;
+ }
+ System.out.println("---- " + title + " ----");
+ System.out.println((major ? "Major " : "Not major ")
+ + "compaction time: " + compactionTime);
+ System.out.println("Max lookback window start time: " +
maxLookbackWindowStart);
+ System.out.println("Max lookback in ms: " + maxLookbackInMillis);
+ System.out.println("TTL in ms: " + ttlInMillis);
+ boolean maxLookbackLine = false;
+ boolean ttlLine = false;
+ for (Cell cell : row) {
+ if (!maxLookbackLine && cell.getTimestamp() <
maxLookbackWindowStart) {
+ System.out.println("-----> Max lookback window start time: " +
maxLookbackWindowStart);
+ maxLookbackLine = true;
+ } else if (!ttlLine && cell.getTimestamp() < ttlWindowStart) {
+ System.out.println("-----> TTL window start time: " +
ttlWindowStart);
+ ttlLine = true;
+ }
+ System.out.println(cell);
+ }
+ }
@Override
public boolean next(List<Cell> result) throws IOException {
boolean hasMore = storeScanner.next(result);
+ inputCellCount += result.size();
if (!result.isEmpty()) {
+ // printRow(result, "Input for " + tableName + " " +
columnFamilyName, true); // This is for debugging
phoenixLevelRowCompactor.compact(result, false);
+ outputCellCount += result.size();
+ // printRow(result, "Output for " + tableName + " " +
columnFamilyName, true); // This is for debugging
}
return hasMore;
}
@@ -159,8 +239,10 @@ public class CompactionScanner implements InternalScanner {
@Override
public void close() throws IOException {
- LOGGER.info("Closing Phoenix CompactionScanner for table " + tableName
+ " store "
- + columnFamilyName);
+ LOGGER.info("Closing CompactionScanner for table " + tableName + "
store "
+ + columnFamilyName + (major ? " major " : " not major ") +
"compaction retained "
+ + outputCellCount + " of " + inputCellCount + " cells"
+ + (phoenixLevelOnly ? " phoenix level only" : ""));
storeScanner.close();
}
@@ -171,13 +253,19 @@ public class CompactionScanner implements InternalScanner
{
static class RowContext {
Cell familyDeleteMarker = null;
Cell familyVersionDeleteMarker = null;
- List<Cell> columnDeleteMarkers = null;
+ List<Cell> columnDeleteMarkers = new ArrayList<>();
int version = 0;
long maxTimestamp;
long minTimestamp;
+
+ private void init() {
+ familyDeleteMarker = null;
+ familyVersionDeleteMarker = null;
+ columnDeleteMarkers.clear();
+ version = 0;
+ }
private void addColumnDeleteMarker(Cell deleteMarker) {
- if (columnDeleteMarkers == null) {
- columnDeleteMarkers = new ArrayList<>();
+ if (columnDeleteMarkers.isEmpty()) {
columnDeleteMarkers.add(deleteMarker);
return;
}
@@ -200,6 +288,8 @@ public class CompactionScanner implements InternalScanner {
// Set it to null so it will be used once
familyVersionDeleteMarker = null;
} else {
+ // The same delete family marker may be retained multiple
times. Duplicates will be
+ // removed later
retainedCells.add(familyDeleteMarker);
}
}
@@ -217,9 +307,13 @@ public class CompactionScanner implements InternalScanner {
if ((CellUtil.matchingFamily(cell, dm)) &&
CellUtil.matchingQualifier(cell, dm)) {
if (dm.getType() == Cell.Type.Delete) {
- // Delete is for deleting a specific cell version.
Thus, it can be used
- // to delete only one cell.
- columnDeleteMarkers.remove(i);
+ if (cell.getTimestamp() == dm.getTimestamp()) {
+ // Delete is for deleting a specific cell version.
Thus, it can be used
+ // to delete only one cell.
+ columnDeleteMarkers.remove(i);
+ } else {
+ continue;
+ }
}
if (maxTimestamp >= ttlWindowStart) {
// Inside the TTL window
@@ -253,6 +347,9 @@ public class CompactionScanner implements InternalScanner {
Cell firstCell;
LinkedList<Cell> deleteColumn = null;
long ts;
+ // The next row version is formed by the first cell of each
column. Similarly, the min
+ // max timestamp of the cells of a row version is determined by
looking at just first
+ // cell of the columns
for (LinkedList<Cell> column : columns) {
firstCell = column.getFirst();
ts = firstCell.getTimestamp();
@@ -269,7 +366,7 @@ public class CompactionScanner implements InternalScanner {
}
}
if (deleteColumn != null) {
- // A row version do not cross a family delete marker. This
means
+ // A row version cannot cross a family delete marker by
definition. This means
// min timestamp cannot be lower than the delete markers
timestamp
for (Cell cell : deleteColumn) {
ts = cell.getTimestamp();
@@ -280,6 +377,41 @@ public class CompactionScanner implements InternalScanner {
}
}
}
+
+ /**
+ * This is used for Phoenix level compaction
+ */
+ private void getNextRowVersionTimestamps(List<Cell> row, byte[]
columnFamily) {
+ maxTimestamp = 0;
+ minTimestamp = Long.MAX_VALUE;
+ Cell deleteFamily = null;
+ long ts;
+ // The next row version is formed by the first cell of each
column. Similarly, the min
+ // max timestamp of the cells of a row version is determined by
looking at just first
+ // cell of the columns
+ for (Cell cell : row) {
+ ts = cell.getTimestamp();
+ if ((cell.getType() == Cell.Type.DeleteFamily ||
+ cell.getType() == Cell.Type.DeleteFamilyVersion) &&
+ CellUtil.matchingFamily(cell, columnFamily)) {
+ deleteFamily = cell;
+ }
+ if (maxTimestamp < ts) {
+ maxTimestamp = ts;
+ }
+ if (minTimestamp > ts) {
+ minTimestamp = ts;
+ }
+ }
+ if (deleteFamily != null) {
+ // A row version cannot cross a family delete marker by
definition. This means
+ // min timestamp cannot be lower than the delete markers
timestamp
+ ts = deleteFamily.getTimestamp();
+ if (ts < maxTimestamp) {
+ minTimestamp = ts + 1;
+ }
+ }
+ }
}
/**
@@ -288,6 +420,8 @@ public class CompactionScanner implements InternalScanner {
*
*/
class HBaseLevelRowCompactor {
+ private RowContext rowContext = new RowContext();
+ private CompactionRowVersion rowVersion = new CompactionRowVersion();
/**
* A compaction row version includes the latest put cell versions from
each column such that
* the cell versions do not cross delete family markers. In other
words, the compaction row
@@ -315,6 +449,10 @@ public class CompactionScanner implements InternalScanner {
// The version of a row version. It is the minimum of the versions
of the cells included
// in the row version
int version = 0;
+
+ private void init() {
+ cells.clear();
+ }
@Override
public String toString() {
StringBuilder output = new StringBuilder();
@@ -331,14 +469,13 @@ public class CompactionScanner implements InternalScanner
{
* Decide if compaction row versions inside the TTL window should be
retained. The
* versions are retained if one of the following conditions holds
* 1. The compaction row version is alive and its version is less than
VERSIONS
- * 2. The compaction row version is deleted and KeepDeletedCells is TTL
- * 3. The compaction row version is deleted, its version is less than
MIN_VERSIONS and
- * KeepDeletedCells is TRUE
+ * 2. The compaction row version is deleted and KeepDeletedCells is
not FALSE
*
*/
private void retainInsideTTLWindow(CompactionRowVersion rowVersion,
RowContext rowContext,
List<Cell> retainedCells) {
- if (rowContext.familyDeleteMarker == null &&
rowContext.familyVersionDeleteMarker == null) {
+ if (rowContext.familyDeleteMarker == null
+ && rowContext.familyVersionDeleteMarker == null) {
// The compaction row version is alive
if (rowVersion.version < maxVersion) {
// Rule 1
@@ -346,9 +483,8 @@ public class CompactionScanner implements InternalScanner {
}
} else {
// Deleted
- if ((rowVersion.version < maxVersion && keepDeletedCells ==
KeepDeletedCells.TRUE) ||
- keepDeletedCells == KeepDeletedCells.TTL) {
- // Retain based on rule 2 or 3
+ if (rowVersion.version < maxVersion && keepDeletedCells !=
KeepDeletedCells.FALSE) {
+ // Retain based on rule 2
retainCells(rowVersion, rowContext, retainedCells);
rowContext.retainFamilyDeleteMarker(retainedCells);
}
@@ -374,12 +510,9 @@ public class CompactionScanner implements InternalScanner {
}
} else {
// Deleted compaction row version
- if (keepDeletedCells == KeepDeletedCells.TTL && (
- (rowContext.familyVersionDeleteMarker != null &&
-
rowContext.familyVersionDeleteMarker.getTimestamp() > ttlWindowStart) ||
- (rowContext.familyDeleteMarker != null &&
-
rowContext.familyDeleteMarker.getTimestamp() > ttlWindowStart)
- )) {
+ if (keepDeletedCells == KeepDeletedCells.TTL
+ && rowContext.familyDeleteMarker != null
+ && rowContext.familyDeleteMarker.getTimestamp() >
ttlWindowStart) {
// Rule 2
retainCells(rowVersion, rowContext, retainedCells);
rowContext.retainFamilyDeleteMarker(retainedCells);
@@ -406,10 +539,9 @@ public class CompactionScanner implements InternalScanner {
*/
private void formNextCompactionRowVersion(LinkedList<LinkedList<Cell>>
columns,
RowContext rowContext, List<Cell> retainedCells) {
- CompactionRowVersion rowVersion = new CompactionRowVersion();
+ rowVersion.init();
rowContext.getNextRowVersionTimestamps(columns, storeColumnFamily);
rowVersion.ts = rowContext.maxTimestamp;
- rowVersion.version = rowContext.version++;
for (LinkedList<Cell> column : columns) {
Cell cell = column.getFirst();
if (column.getFirst().getTimestamp() <
rowContext.minTimestamp) {
@@ -419,13 +551,15 @@ public class CompactionScanner implements InternalScanner
{
if (cell.getTimestamp() >= rowContext.maxTimestamp) {
rowContext.familyDeleteMarker = cell;
column.removeFirst();
+ break;
}
continue;
}
else if (cell.getType() == Cell.Type.DeleteFamilyVersion) {
- if (cell.getTimestamp() >= rowContext.maxTimestamp) {
+ if (cell.getTimestamp() == rowVersion.ts) {
rowContext.familyVersionDeleteMarker = cell;
column.removeFirst();
+ break;
}
continue;
}
@@ -440,6 +574,7 @@ public class CompactionScanner implements InternalScanner {
if (rowVersion.cells.isEmpty()) {
return;
}
+ rowVersion.version = rowContext.version++;
if (rowVersion.ts >= ttlWindowStart) {
retainInsideTTLWindow(rowVersion, rowContext, retainedCells);
} else {
@@ -449,7 +584,7 @@ public class CompactionScanner implements InternalScanner {
private void formCompactionRowVersions(LinkedList<LinkedList<Cell>>
columns,
List<Cell> result) {
- RowContext rowContext = new RowContext();
+ rowContext.init();
while (!columns.isEmpty()) {
formNextCompactionRowVersion(columns, rowContext, result);
// Remove the columns that are empty
@@ -468,14 +603,10 @@ public class CompactionScanner implements InternalScanner
{
* the pair of family name and column qualifier. While doing that also
add the delete
* markers to a separate list.
*/
- private void formColumns(List<Cell> result,
LinkedList<LinkedList<Cell>> columns,
- List<Cell> deleteMarkers) {
+ private void formColumns(List<Cell> result,
LinkedList<LinkedList<Cell>> columns) {
Cell currentColumnCell = null;
LinkedList<Cell> currentColumn = null;
for (Cell cell : result) {
- if (cell.getType() != Cell.Type.Put) {
- deleteMarkers.add(cell);
- }
if (currentColumnCell == null) {
currentColumn = new LinkedList<>();
currentColumnCell = cell;
@@ -503,8 +634,7 @@ public class CompactionScanner implements InternalScanner {
return;
}
LinkedList<LinkedList<Cell>> columns = new LinkedList<>();
- List<Cell> deleteMarkers = new ArrayList<>();
- formColumns(result, columns, deleteMarkers);
+ formColumns(result, columns);
result.clear();
formCompactionRowVersions(columns, result);
}
@@ -520,6 +650,12 @@ public class CompactionScanner implements InternalScanner {
*
*/
class PhoenixLevelRowCompactor {
+ private RowContext rowContext = new RowContext();
+ List<Cell> lastRowVersion = new ArrayList<>();
+ List<Cell> emptyColumn = new ArrayList<>();
+ List<Cell> phoenixResult = new ArrayList<>();
+ List<Cell> trimmedRow = new ArrayList<>();
+ List<Cell> trimmedEmptyColumn = new ArrayList<>();
/**
* The cells of the row (i.e., result) read from HBase store are
lexicographically ordered
@@ -529,33 +665,53 @@ public class CompactionScanner implements InternalScanner
{
* based on the pair of family name and column qualifier.
*
* The cells within the max lookback window except the once at the
lower edge of the
- * max lookback window are retained immediately and not included in
the constructed columns.
+ * max lookback window (the last row of the max lookback window) are
retained immediately.
+ *
+ * This method also returned the remaining cells (outside the max
lookback window) of
+ * the empty colum
*/
- private void getColumns(List<Cell> result,
LinkedList<LinkedList<Cell>> columns,
- List<Cell> retainedCells) {
+ private void getLastRowVersionInMaxLookbackWindow(List<Cell> result,
+ List<Cell> lastRowVersion, List<Cell> retainedCells,
List<Cell> emptyColumn) {
Cell currentColumnCell = null;
- LinkedList<Cell> currentColumn = null;
+ boolean isEmptyColumn = false;
+ Cell cellAtMaxLookbackWindowStart = null;
for (Cell cell : result) {
- if (cell.getTimestamp() > maxLookbackWindowStart) {
+ if (cell.getTimestamp() >= maxLookbackWindowStart) {
retainedCells.add(cell);
+ if (cell.getTimestamp() == maxLookbackWindowStart) {
+ cellAtMaxLookbackWindowStart = cell;
+ }
continue;
}
- if (currentColumnCell == null) {
- currentColumn = new LinkedList<>();
- currentColumnCell = cell;
- currentColumn.add(cell);
- } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) {
- columns.add(currentColumn);
- currentColumn = new LinkedList<>();
+ if (!major && cell.getType() != Cell.Type.Put) {
+ retainedCells.add(cell);
+ }
+ if (currentColumnCell == null ||
+ !CellUtil.matchingColumn(cell, currentColumnCell)) {
currentColumnCell = cell;
- currentColumn.add(cell);
- } else {
- currentColumn.add(cell);
+ if (cell.getType() != Cell.Type.Delete
+ && cell.getType() != Cell.Type.DeleteColumn) {
+ // Include only delete family markers and put cells
+ // It is possible that this cell is not visible from
the max lookback
+ // window. This happens when there is a mutation with
the mutation timestamp
+ // equal to the max lookback window start timestamp.
The following is to
+ // check for this case
+ if (cellAtMaxLookbackWindowStart == null
+ || !CellUtil.matchingColumn(cell,
cellAtMaxLookbackWindowStart)) {
+ lastRowVersion.add(cell);
+ }
+ }
+ if (major && ScanUtil.isEmptyColumn(cell, emptyCF,
emptyCQ)) {
+ isEmptyColumn = true;
+ } else {
+ isEmptyColumn = false;
+ }
+ } else if (major && isEmptyColumn) {
+ // We only need to keep one cell for every column for the
last row version.
+ // So here we just form the empty column beyond the last
row version
+ emptyColumn.add(cell);
}
}
- if (currentColumn != null) {
- columns.add(currentColumn);
- }
}
/**
@@ -576,10 +732,10 @@ public class CompactionScanner implements InternalScanner
{
if (previous == -1) {
break;
}
- if (max - ts > ttl) {
+ if (max - ts > ttlInMillis) {
max = input.get(previous).getTimestamp();
output.add(input.remove(previous));
- if (max - min > ttl) {
+ if (max - min > ttlInMillis) {
closeGap(max, min, input, output);
}
return;
@@ -591,93 +747,88 @@ public class CompactionScanner implements InternalScanner
{
/**
* Retain the last row version visible through the max lookback window
*/
- private void retainCellsOfLastRowVersion(LinkedList<LinkedList<Cell>>
columns,
- List<Cell> retainedCells) {
- if (columns.isEmpty()) {
+ private void retainCellsOfLastRowVersion(List<Cell> lastRow,
+ List<Cell> emptyColumn, List<Cell> retainedCells) {
+ if (lastRow.isEmpty()) {
return;
}
- RowContext rowContext = new RowContext();
- rowContext.getNextRowVersionTimestamps(columns, storeColumnFamily);
- List<Cell> retainedPutCells = new ArrayList<>();
- for (LinkedList<Cell> column : columns) {
- Cell cell = column.getFirst();
- if (cell.getTimestamp() < rowContext.minTimestamp) {
- continue;
- }
- if (cell.getType() == Cell.Type.Put) {
- retainedPutCells.add(cell);
- } else if (cell.getType() == Cell.Type.DeleteFamily ||
- cell.getType() == Cell.Type.DeleteFamilyVersion) {
- if (cell.getTimestamp() >= rowContext.maxTimestamp) {
- // This means that the row version outside the max
lookback window is
- // deleted and thus should not be visible to the scn
queries
- if (cell.getTimestamp() == maxLookbackWindowStart) {
- // Include delete markers at maxLookbackWindowStart
- retainedCells.add(cell);
- }
- return;
- }
- } else if (cell.getTimestamp() == maxLookbackWindowStart) {
- // Include delete markers at maxLookbackWindowStart
- retainedCells.add(cell);
+ rowContext.init();
+ rowContext.getNextRowVersionTimestamps(lastRow, storeColumnFamily);
+ Cell firstCell = lastRow.get(0);
+ if (firstCell.getType() == Cell.Type.DeleteFamily ||
+ firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
+ if (firstCell.getTimestamp() >= rowContext.maxTimestamp) {
+ // This means that the row version outside the max
lookback window is
+ // deleted and thus should not be visible to the scn
queries
+ return;
}
}
- if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis
+ ttl) {
+
+ if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis
+ ttlInMillis) {
// The row version should not be visible via the max lookback
window. Nothing to do
return;
}
- retainedCells.addAll(retainedPutCells);
+ retainedCells.addAll(lastRow);
// If the gap between two back to back mutations is more than ttl
then the older
// mutation will be considered expired and masked. If the length
of the time range of
// a row version is not more than ttl, then we know the cells
covered by the row
// version are not apart from each other more than ttl and will
not be masked.
- if (rowContext.maxTimestamp - rowContext.minTimestamp <= ttl) {
+ if (rowContext.maxTimestamp - rowContext.minTimestamp <=
ttlInMillis) {
return;
}
// The quick time range check did not pass. We need get at least
one empty cell to cover
// the gap so that the row version will not be masked by
PhoenixTTLRegionScanner.
- List<Cell> emptyCellColumn = null;
- for (LinkedList<Cell> column : columns) {
- if (ScanUtil.isEmptyColumn(column.getFirst(), emptyCF,
emptyCQ)) {
- emptyCellColumn = column;
- break;
- }
- }
- if (emptyCellColumn == null) {
+ if (emptyColumn.isEmpty()) {
return;
}
- int size = retainedPutCells.size();
+ int size = lastRow.size();
long tsArray[] = new long[size];
int i = 0;
- for (Cell cell : retainedPutCells) {
+ for (Cell cell : lastRow) {
tsArray[i++] = cell.getTimestamp();
}
Arrays.sort(tsArray);
for (i = size - 1; i > 0; i--) {
- if (tsArray[i] - tsArray[i - 1] > ttl) {
- closeGap(tsArray[i], tsArray[i - 1], emptyCellColumn,
retainedCells);
+ if (tsArray[i] - tsArray[i - 1] > ttlInMillis) {
+ closeGap(tsArray[i], tsArray[i - 1], emptyColumn,
retainedCells);
}
}
}
+ /**
+ * The retained cells includes the cells that are visible through the
max lookback
+ * window and the additional empty column cells that are needed to
reduce large time
+ * between the cells of the last row version.
+ */
private boolean retainCellsForMaxLookback(List<Cell> result, boolean
regionLevel,
List<Cell> retainedCells) {
- LinkedList<LinkedList<Cell>> columns = new LinkedList<>();
- getColumns(result, columns, retainedCells);
+
+ lastRowVersion.clear();
+ emptyColumn.clear();
+ getLastRowVersionInMaxLookbackWindow(result, lastRowVersion,
retainedCells,
+ emptyColumn);
+ if (lastRowVersion.isEmpty()) {
+ return true;
+ }
+ if (!major) {
+ // We do not expire cells for minor compaction and memstore
flushes
+ retainCellsOfLastRowVersion(lastRowVersion, emptyColumn,
retainedCells);
+ return true;
+ }
long maxTimestamp = 0;
long minTimestamp = Long.MAX_VALUE;
long ts;
- for (LinkedList<Cell> column : columns) {
- ts = column.getFirst().getTimestamp();
+ for (Cell cell : lastRowVersion) {
+ ts =cell.getTimestamp();
if (ts > maxTimestamp) {
maxTimestamp = ts;
}
- ts = column.getLast().getTimestamp();
+ ts = cell.getTimestamp();
if (ts < minTimestamp) {
minTimestamp = ts;
}
}
- if (compactionTime - maxTimestamp > maxLookbackInMillis + ttl) {
+ if (compactionTime - maxTimestamp > maxLookbackInMillis +
ttlInMillis) {
if (!emptyCFStore && !regionLevel) {
// The row version is more than maxLookbackInMillis + ttl
old. We cannot decide
// if we should retain it with the store level compaction
when the current
@@ -688,7 +839,7 @@ public class CompactionScanner implements InternalScanner {
}
// If the time gap between two back to back mutations is more than
ttl then we know
// that the row is expired within the time gap.
- if (maxTimestamp - minTimestamp > ttl) {
+ if (maxTimestamp - minTimestamp > ttlInMillis) {
if ((familyCount > 1 && !regionLevel && !localIndex)) {
// When there are more than one column family for a given
table and a row
// version constructed at the store level covers a time
span larger than ttl,
@@ -699,38 +850,63 @@ public class CompactionScanner implements InternalScanner
{
return false;
}
// We either have one column family or are doing region level
compaction. In both
- // case, we can safely trim the cells beyond the first time
gap larger ttl
- int size = result.size();
+ // case, we can safely trim the cells beyond the first time
gap larger ttl.
+ // Here we are interested in the gaps between the cells of the
last row version
+ // amd thus we need to examine the gaps between these cells
and the empty column.
+ // Please note that empty column is always updated for every
mutation and so we
+ // just need empty column cells for the gap analysis.
+ int size = lastRowVersion.size();
+ size += emptyColumn.size();
long tsArray[] = new long[size];
int i = 0;
- for (Cell cell : result) {
+ for (Cell cell : lastRowVersion) {
+ tsArray[i++] = cell.getTimestamp();
+ }
+ for (Cell cell : emptyColumn) {
tsArray[i++] = cell.getTimestamp();
}
Arrays.sort(tsArray);
boolean gapFound = false;
// Since timestamps are sorted in ascending order, traverse
them in reverse order
for (i = size - 1; i > 0; i--) {
- if (tsArray[i] - tsArray[i - 1] > ttl) {
+ if (tsArray[i] - tsArray[i - 1] > ttlInMillis) {
minTimestamp = tsArray[i];
gapFound = true;
break;
}
}
if (gapFound) {
- List<Cell> trimmedResult = new ArrayList<>(size - i);
- for (Cell cell : result) {
+ trimmedRow.clear();
+ for (Cell cell : lastRowVersion) {
if (cell.getTimestamp() >= minTimestamp) {
- trimmedResult.add(cell);
+ trimmedRow.add(cell);
}
}
- columns.clear();
- retainedCells.clear();
- getColumns(trimmedResult, columns, retainedCells);
+ lastRowVersion = trimmedRow;
+ trimmedEmptyColumn.clear();;
+ for (Cell cell : lastRowVersion) {
+ if (cell.getTimestamp() >= minTimestamp) {
+ trimmedEmptyColumn.add(cell);
+ }
+ }
+ emptyColumn = trimmedEmptyColumn;
}
}
- retainCellsOfLastRowVersion(columns, retainedCells);
+ retainCellsOfLastRowVersion(lastRowVersion, emptyColumn,
retainedCells);
return true;
}
+ private void removeDuplicates(List<Cell> input, List<Cell> output) {
+ Cell previousCell = null;
+ for (Cell cell : input) {
+ if (previousCell == null ||
+ cell.getTimestamp() != previousCell.getTimestamp() ||
+ cell.getType() != previousCell.getType() ||
+ !CellUtil.matchingColumn(cell, previousCell)) {
+ output.add(cell);
+ }
+ previousCell = cell;
+ }
+ }
/**
* Compacts a single row at the Phoenix level. The result parameter is
the input row and
* modified to be the output of the compaction process.
@@ -739,7 +915,7 @@ public class CompactionScanner implements InternalScanner {
if (result.isEmpty()) {
return;
}
- List<Cell> phoenixResult = new ArrayList<>(result.size());
+ phoenixResult.clear();
if (!retainCellsForMaxLookback(result, regionLevel,
phoenixResult)) {
if (familyCount == 1 || regionLevel) {
throw new RuntimeException("UNEXPECTED");
@@ -747,14 +923,17 @@ public class CompactionScanner implements InternalScanner
{
phoenixResult.clear();
compactRegionLevel(result, phoenixResult);
}
- if (maxVersion == 1 && minVersion == 0 && keepDeletedCells ==
KeepDeletedCells.FALSE) {
- // We need to Phoenix level compaction only
+ if (maxVersion == 1
+ && (!major
+ || (minVersion == 0 && keepDeletedCells ==
KeepDeletedCells.FALSE))) {
+ // We need Phoenix level compaction only
Collections.sort(phoenixResult, CellComparator.getInstance());
result.clear();
- result.addAll(phoenixResult);
+ removeDuplicates(phoenixResult, result);
+ phoenixLevelOnly = true;
return;
}
- // We may need to do retain more cells, and so we need to run
HBase level compaction
+ // We may need to retain more cells, and so we need to run HBase
level compaction
// too. The result of two compactions will be merged and duplicate
cells are removed.
int phoenixResultSize = phoenixResult.size();
List<Cell> hbaseResult = new ArrayList<>(result);
@@ -762,18 +941,7 @@ public class CompactionScanner implements InternalScanner {
phoenixResult.addAll(hbaseResult);
Collections.sort(phoenixResult, CellComparator.getInstance());
result.clear();
- Cell previousCell = null;
- // Eliminate duplicates
- for (Cell cell : phoenixResult) {
- if (previousCell == null ||
- cell.getTimestamp() != previousCell.getTimestamp() ||
- cell.getType() != previousCell.getType() ||
- !CellUtil.matchingColumn(cell, previousCell)) {
- result.add(cell);
- }
- previousCell = cell;
- }
-
+ removeDuplicates(phoenixResult, result);
if (result.size() > phoenixResultSize) {
LOGGER.debug("HBase level compaction retained " +
(result.size() - phoenixResultSize) + " more cells");
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d1b9f17112..d0e8361472 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -32,9 +32,11 @@ import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.GuardedBy;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -174,8 +177,26 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
private Configuration compactionConfig;
private Configuration indexWriteConfig;
private ReadOnlyProps indexWriteProps;
- private boolean isPhoenixTableTTLEnabled;
+ private static Map<String, Long> maxLookbackMap = new
ConcurrentHashMap<>();
+
+ public static void setMaxLookbackInMillis(String tableName, long
maxLookbackInMillis) {
+ if (tableName == null) {
+ return;
+ }
+ maxLookbackMap.put(tableName,
+ maxLookbackInMillis);
+ }
+
+ public static long getMaxLookbackInMillis(String tableName, long
maxLookbackInMillis) {
+ if (tableName == null) {
+ return maxLookbackInMillis;
+ }
+ Long value = maxLookbackMap.get(tableName);
+ return value == null
+ ? maxLookbackInMillis
+ : maxLookbackMap.get(tableName);
+ }
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
@@ -205,9 +226,6 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER));
indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
- isPhoenixTableTTLEnabled =
-
e.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
- QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
}
Configuration getUpsertSelectConfig() {
@@ -581,92 +599,128 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
region.getTableDescriptor().getTableName().getName()) == 0);
}
+ @Override
+ public InternalScanner
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ InternalScanner scanner, FlushLifeCycleTracker tracker) throws
IOException {
+ if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
+ return scanner;
+ } else {
+ return User.runAsLoginUser(new
PrivilegedExceptionAction<InternalScanner>() {
+ @Override public InternalScanner run() throws Exception {
+ String tableName =
c.getEnvironment().getRegion().getRegionInfo().getTable()
+ .getNameAsString();
+ long maxLookbackInMillis =
+
UngroupedAggregateRegionObserver.getMaxLookbackInMillis(
+ tableName,
+
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(
+
c.getEnvironment().getConfiguration()));
+ maxLookbackInMillis =
CompactionScanner.getMaxLookbackInMillis(tableName,
+ store.getColumnFamilyName(), maxLookbackInMillis);
+ return new CompactionScanner(c.getEnvironment(), store,
scanner,
+ maxLookbackInMillis, null, null, false, true);
+ }
+ });
+ }
+ }
+
@Override
public InternalScanner
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType
scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws
IOException {
- if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
- final TableName tableName =
c.getEnvironment().getRegion().getRegionInfo().getTable();
- // Compaction and split upcalls run with the effective user
context of the requesting user.
- // This will lead to failure of cross cluster RPC if the effective
user is not
- // the login user. Switch to the login user context to ensure we
have the expected
- // security context.
- return User.runAsLoginUser(new
PrivilegedExceptionAction<InternalScanner>() {
- @Override
- public InternalScanner run() throws Exception {
- InternalScanner internalScanner = scanner;
- if (request.isMajor()) {
- boolean isDisabled = false;
- boolean isMultiTenantIndexTable = false;
- if
(tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
- isMultiTenantIndexTable = true;
- }
- final String fullTableName = isMultiTenantIndexTable ?
-
SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(),
- MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) :
- tableName.getNameAsString();
- PTable table = null;
- try (PhoenixConnection conn =
QueryUtil.getConnectionOnServer(
-
compactionConfig).unwrap(PhoenixConnection.class)) {
- table = conn.getTableNoCache(fullTableName);
- } catch (Exception e) {
- if (e instanceof TableNotFoundException) {
- LOGGER.debug("Ignoring HBase table that is not
a Phoenix table: "
- + fullTableName);
- // non-Phoenix HBase tables won't be found, do
nothing
- } else {
- LOGGER.error(
- "Unable to modify compaction scanner
to retain deleted "
- + "cells for a table with
disabled Index; "
- + fullTableName, e);
- }
- }
- // The previous indexing design needs to retain delete
markers and deleted
- // cells to rebuild disabled indexes. Thus, we skip
major compaction for
- // them. GlobalIndexChecker is the coprocessor
introduced by the current
- // indexing design.
- if (table != null &&
-
!PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName) &&
- !ServerUtil.hasCoprocessor(c.getEnvironment(),
- GlobalIndexChecker.class.getName())) {
- List<PTable>
- indexes =
- PTableType.INDEX.equals(table.getType()) ?
- Lists.newArrayList(table) :
- table.getIndexes();
- // FIXME need to handle views and indexes on views
as well
- for (PTable index : indexes) {
- if (index.getIndexDisableTimestamp() != 0) {
- LOGGER.info("Modifying major compaction
scanner to retain "
- + "deleted cells for a table with
disabled index: "
- + fullTableName);
- isDisabled = true;
- break;
- }
- }
- }
- if (table != null && !isDisabled &&
isPhoenixTableTTLEnabled) {
- internalScanner =
- new CompactionScanner(c.getEnvironment(),
store, scanner,
- MetaDataUtil.getMaxLookbackAge(
-
c.getEnvironment().getConfiguration(), table.getMaxLookbackAge()),
-
SchemaUtil.getEmptyColumnFamily(table),
- table.getEncodingScheme()
- ==
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
-
QueryConstants.EMPTY_COLUMN_BYTES :
-
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME)
- );
+
+ final TableName tableName =
c.getEnvironment().getRegion().getRegionInfo().getTable();
+ // Compaction and split upcalls run with the effective user context of
the requesting user.
+ // This will lead to failure of cross cluster RPC if the effective
user is not
+ // the login user. Switch to the login user context to ensure we have
the expected
+ // security context.
+ return User.runAsLoginUser(new
PrivilegedExceptionAction<InternalScanner>() {
+ @Override
+ public InternalScanner run() throws Exception {
+ InternalScanner internalScanner = scanner;
+ boolean keepDeleted = false;
+ boolean isMultiTenantIndexTable = false;
+ if
(tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
+ isMultiTenantIndexTable = true;
+ }
+ final String fullTableName = isMultiTenantIndexTable ?
+
SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(),
+ MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) :
+ tableName.getNameAsString();
+ PTable table = null;
+ Long maxLookbackAge = null;
+ try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(
+ compactionConfig).unwrap(PhoenixConnection.class)) {
+ table = conn.getTableNoCache(fullTableName);
+ maxLookbackAge = table.getMaxLookbackAge();
+ UngroupedAggregateRegionObserver.setMaxLookbackInMillis(
+ tableName.getNameAsString(),
+ MetaDataUtil.getMaxLookbackAge(
+ c.getEnvironment().getConfiguration(),
+ maxLookbackAge));
+ } catch (Exception e) {
+ if (e instanceof TableNotFoundException) {
+ LOGGER.debug("Ignoring HBase table that is not a
Phoenix table: "
+ + fullTableName);
+ // non-Phoenix HBase tables won't be found, do nothing
+ } else {
+ LOGGER.error(
+ "Unable to modify compaction scanner to retain
deleted "
+ + "cells for a table with disabled
Index; "
+ + fullTableName, e);
+ }
+ }
+ // The previous indexing design needs to retain delete markers
and deleted
+ // cells to rebuild disabled indexes. Thus, we skip major
compaction for
+ // them. GlobalIndexChecker is the coprocessor introduced by
the current
+ // indexing design.
+ if (table != null &&
+
!PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName) &&
+ !ServerUtil.hasCoprocessor(c.getEnvironment(),
+ GlobalIndexChecker.class.getName())) {
+ List<PTable>
+ indexes =
+ PTableType.INDEX.equals(table.getType()) ?
+ Lists.newArrayList(table) :
+ table.getIndexes();
+ // FIXME need to handle views and indexes on views as well
+ for (PTable index : indexes) {
+ if (index.getIndexDisableTimestamp() != 0) {
+ LOGGER.info("Modifying major compaction scanner to
retain "
+ + "deleted cells for a table with disabled
index: "
+ + fullTableName);
+ keepDeleted = true;
+ break;
}
}
+ }
+ if (table != null
+ &&
isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
+ internalScanner =
+ new CompactionScanner(c.getEnvironment(), store,
scanner,
+ MetaDataUtil.getMaxLookbackAge(
+
c.getEnvironment().getConfiguration(),
+ maxLookbackAge),
+ SchemaUtil.getEmptyColumnFamily(table),
+ table.getEncodingScheme()
+ ==
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+ QueryConstants.EMPTY_COLUMN_BYTES :
+
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME),
+ request.isMajor() || request.isAllFiles(),
keepDeleted
+ );
+ }
+ if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
try {
long clientTimeStamp =
EnvironmentEdgeManager.currentTimeMillis();
- DelegateRegionCoprocessorEnvironment compactionConfEnv
=
+ DelegateRegionCoprocessorEnvironment
+ compactionConfEnv =
new
DelegateRegionCoprocessorEnvironment(c.getEnvironment(),
ConnectionType.COMPACTION_CONNECTION);
- StatisticsCollector statisticsCollector =
+ StatisticsCollector
+ statisticsCollector =
StatisticsCollectorFactory.createStatisticsCollector(
- compactionConfEnv,
tableName.getNameAsString(), clientTimeStamp,
- store.getColumnFamilyDescriptor().getName());
+ compactionConfEnv,
tableName.getNameAsString(),
+ clientTimeStamp,
+
store.getColumnFamilyDescriptor().getName());
statisticsCollector.init();
internalScanner =
statisticsCollector.createCompactionScanner(compactionConfEnv,
@@ -678,11 +732,10 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
LOGGER.warn("Unable to collect stats for " +
tableName, e);
}
}
- return internalScanner;
}
- });
- }
- return scanner;
+ return internalScanner;
+ }
+ });
}
static PTable deserializeTable(byte[] b) {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index f9900fdb7b..e7157aee95 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -22,12 +22,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.CompactionScanner;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
@@ -50,9 +48,7 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
import java.sql.Connection;
-import java.sql.Date;
import java.sql.DriverManager;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -288,11 +284,12 @@ public class MaxLookbackExtendedIT extends BaseTest {
long beforeFirstCompactSCN =
EnvironmentEdgeManager.currentTimeMillis();
injectEdge.incrementValue(1); //new ts for major compaction
majorCompact(dataTable);
- majorCompact(indexTable);
assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
+ majorCompact(indexTable);
assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
//wait for the lookback time. After this compactions should purge
the deleted row
- long timeToAdvance = hasTableLevelMaxLookback ?
TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
+ long timeToAdvance = hasTableLevelMaxLookback
+ ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
injectEdge.incrementValue(timeToAdvance * 1000);
long beforeSecondCompactSCN =
EnvironmentEdgeManager.currentTimeMillis();
String notDeletedRowSql =
@@ -311,10 +308,14 @@ public class MaxLookbackExtendedIT extends BaseTest {
Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
beforeDeleteSCN + MAX_LOOKBACK_AGE * 1000);
}
+ flush(dataTable);
majorCompact(dataTable);
- majorCompact(indexTable);
// Deleted row versions should be removed.
assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+
+ flush(indexTable);
+ majorCompact(indexTable);
+ // Deleted row versions should be removed.
assertRawRowCount(conn, indexTable, ROWS_POPULATED);
//deleted row should be gone, but not deleted row should still be
there.
@@ -425,7 +426,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
assertRawRowCount(conn, dataTable, originalRowCount);
assertRawRowCount(conn, indexTable, originalRowCount);
assertExplainPlan(conn, indexSql, dataTableName, indexName);
- long timeToAdvance = (MAX_LOOKBACK_AGE * 1000) -
+ long timeToAdvance = (MAX_LOOKBACK_AGE * 1000L) -
(EnvironmentEdgeManager.currentTimeMillis() -
afterFirstInsertSCN);
if (timeToAdvance > 0) {
injectEdge.incrementValue(timeToAdvance);
@@ -440,7 +441,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
assertRawRowCount(conn, dataTable, originalRowCount);
assertRawRowCount(conn, indexTable, originalRowCount);
//now wait the TTL
- timeToAdvance = (ttl * 1000) -
+ timeToAdvance = (ttl * 1000L) -
(EnvironmentEdgeManager.currentTimeMillis() -
afterFirstInsertSCN);
if (timeToAdvance > 0) {
injectEdge.incrementValue(timeToAdvance);
@@ -458,7 +459,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
Assert.assertEquals(0, rs.getInt(1));
// Increment the time by max lookback age and make sure that we
can compact away
// the now-expired rows
- injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000L);
majorCompact(dataTable);
majorCompact(indexTable);
assertRawRowCount(conn, dataTable, 0);
@@ -579,34 +580,35 @@ public class MaxLookbackExtendedIT extends BaseTest {
CompactionScanner.overrideMaxLookback(tableNameTwo, "A",
maxLookbackTwo * 1000);
CompactionScanner.overrideMaxLookback(tableNameTwo, "B",
maxLookbackTwo * 1000);
}
- injectEdge.incrementValue(1);
+ injectEdge.incrementValue(1000);
populateTable(tableNameOne);
populateTable(tableNameTwo);
- injectEdge.incrementValue(1);
+ injectEdge.incrementValue(1000);
populateTable(tableNameOne);
populateTable(tableNameTwo);
- injectEdge.incrementValue(1);
+ injectEdge.incrementValue(1000);
conn.createStatement().executeUpdate("DELETE FROM " +
tableNameOne);
conn.createStatement().executeUpdate("DELETE FROM " +
tableNameTwo);
conn.commit();
- injectEdge.incrementValue(1);
// Move the time so that deleted row versions will be outside the
maxlookback window
- // of tableNameOne but the delete markers will be inside
+ // of tableNameOne but the delete markers will be inside (in this
case on the lower
+ // edge of the window)
injectEdge.incrementValue(maxLookbackOne * 1000);
- // Compact both tables. Deleted row versions will be removed from
tableNameOne as they
- // are now outside its max lookback window.
+ // Compact both tables. Deleted row versions will be retained
since the delete marker
+ // is retained. This is necessary to include the preimage of the
row in the CDC
+ // stream.
flush(TableName.valueOf(tableNameOne));
flush(TableName.valueOf(tableNameTwo));
majorCompact(TableName.valueOf(tableNameOne));
majorCompact(TableName.valueOf(tableNameTwo));
if (multiCF) {
- assertRawCellCount(conn, TableName.valueOf(tableNameOne),
Bytes.toBytes("a"), 3);
- assertRawCellCount(conn, TableName.valueOf(tableNameOne),
Bytes.toBytes("b"), 3);
+ assertRawCellCount(conn, TableName.valueOf(tableNameOne),
Bytes.toBytes("a"), 7);
+ assertRawCellCount(conn, TableName.valueOf(tableNameOne),
Bytes.toBytes("b"), 7);
assertRawCellCount(conn, TableName.valueOf(tableNameTwo),
Bytes.toBytes("a"), 11);
assertRawCellCount(conn, TableName.valueOf(tableNameTwo),
Bytes.toBytes("b"), 11);
} else {
- assertRawCellCount(conn, TableName.valueOf(tableNameOne),
Bytes.toBytes("a"), 1);
- assertRawCellCount(conn, TableName.valueOf(tableNameOne),
Bytes.toBytes("b"), 1);
+ assertRawCellCount(conn, TableName.valueOf(tableNameOne),
Bytes.toBytes("a"), 5);
+ assertRawCellCount(conn, TableName.valueOf(tableNameOne),
Bytes.toBytes("b"), 5);
assertRawCellCount(conn, TableName.valueOf(tableNameTwo),
Bytes.toBytes("a"), 9);
assertRawCellCount(conn, TableName.valueOf(tableNameTwo),
Bytes.toBytes("b"), 9);
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 3b6d1277e8..591016b37d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -53,6 +53,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import static org.junit.Assert.assertTrue;
+
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class TableTTLIT extends BaseTest {
@@ -158,34 +160,46 @@ public class TableTTLIT extends BaseTest {
final int maxLookbackAge = tableLevelMaxLooback != null ?
tableLevelMaxLooback : MAX_LOOKBACK_AGE;
final int maxDeleteCounter = maxLookbackAge == 0 ? 1 : maxLookbackAge;
final int maxCompactionCounter = ttl / 2;
+ final int maxFlushCounter = ttl;
final int maxMaskingCounter = 2 * ttl;
+ final int maxVerificationCounter = 2 * ttl;
final byte[] rowKey = Bytes.toBytes("a");
try (Connection conn = DriverManager.getConnection(getUrl())) {
String tableName = generateUniqueName();
createTable(tableName);
+ conn.createStatement().execute("Alter Table " + tableName + " set
\"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge);
+ conn.commit();
String noCompactTableName = generateUniqueName();
createTable(noCompactTableName);
+ conn.createStatement().execute("Alter Table " + noCompactTableName
+ " set \"phoenix.table.ttl.enabled\" = false");
+ conn.commit();
long startTime = System.currentTimeMillis() + 1000;
startTime = (startTime / 1000) * 1000;
EnvironmentEdgeManager.injectEdge(injectEdge);
injectEdge.setValue(startTime);
int deleteCounter = RAND.nextInt(maxDeleteCounter) + 1;
int compactionCounter = RAND.nextInt(maxCompactionCounter) + 1;
+ int flushCounter = RAND.nextInt(maxFlushCounter) + 1;
int maskingCounter = RAND.nextInt(maxMaskingCounter) + 1;
- boolean afterCompaction = false;
+ int verificationCounter = RAND.nextInt(maxVerificationCounter) + 1;
for (int i = 0; i < 500; i++) {
+ if (flushCounter-- == 0) {
+ injectEdge.incrementValue(1000);
+ LOG.info("Flush " + i + " current time: " +
injectEdge.currentTime());
+ flush(TableName.valueOf(tableName));
+ flushCounter = RAND.nextInt(maxFlushCounter) + 1;
+ }
if (compactionCounter-- == 0) {
injectEdge.incrementValue(1000);
- LOG.debug("Compaction " + i + " current time: " +
injectEdge.currentTime());
+ LOG.info("Compaction " + i + " current time: " +
injectEdge.currentTime());
flush(TableName.valueOf(tableName));
majorCompact(TableName.valueOf(tableName));
compactionCounter = RAND.nextInt(maxCompactionCounter) + 1;
- afterCompaction = true;
}
if (maskingCounter-- == 0) {
updateRow(conn, tableName, noCompactTableName, "a");
injectEdge.incrementValue((ttl + maxLookbackAge + 1) *
1000);
- LOG.debug("Masking " + i + " current time: " +
injectEdge.currentTime());
+ LOG.info("Masking " + i + " current time: " +
injectEdge.currentTime());
ResultSet rs = conn.createStatement().executeQuery(
"SELECT count(*) FROM " + tableName);
Assert.assertTrue(rs.next());
@@ -200,20 +214,18 @@ public class TableTTLIT extends BaseTest {
maskingCounter = RAND.nextInt(maxMaskingCounter) + 1;
}
if (deleteCounter-- == 0) {
- LOG.debug("Delete " + i + " current time: " +
injectEdge.currentTime());
+ LOG.info("Delete " + i + " current time: " +
injectEdge.currentTime());
deleteRow(conn, tableName, "a");
deleteRow(conn, noCompactTableName, "a");
deleteCounter = RAND.nextInt(maxDeleteCounter) + 1;
injectEdge.incrementValue(1000);
}
+ injectEdge.incrementValue(1000);
updateRow(conn, tableName, noCompactTableName, "a");
- if (!afterCompaction) {
- injectEdge.incrementValue(1000);
- // We are interested in the correctness of compaction and
masking. Thus, we
- // only need to do the latest version and scn queries to
after compaction.
+ if (verificationCounter-- > 0) {
continue;
}
- afterCompaction = false;
+ verificationCounter = RAND.nextInt(maxVerificationCounter) + 1;
compareRow(conn, tableName, noCompactTableName, "a",
MAX_COLUMN_INDEX);
long scn = injectEdge.currentTime() - maxLookbackAge * 1000;
long scnEnd = injectEdge.currentTime();
@@ -227,7 +239,6 @@ public class TableTTLIT extends BaseTest {
MAX_COLUMN_INDEX);
}
}
- injectEdge.incrementValue(1000);
}
}
}
@@ -257,16 +268,16 @@ public class TableTTLIT extends BaseTest {
// At every flush, extra cell versions should be removed.
// MAX_COLUMN_INDEX table columns and one empty column will be
retained for
// each row version.
- TestUtil.assertRawCellCount(conn,
TableName.valueOf(tableName), row,
- (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
+ assertTrue(TestUtil.getRawCellCount(conn,
TableName.valueOf(tableName), row)
+ <= (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
}
// Run one minor compaction (in case no minor compaction has
happened yet)
Admin admin = utility.getAdmin();
admin.compact(TableName.valueOf(tableName));
int waitCount = 0;
while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
- Bytes.toBytes("a")) < flushCount * (MAX_COLUMN_INDEX + 1)
* versions) {
- // Wait for major compactions to happen
+ Bytes.toBytes("a")) >= flushCount * (MAX_COLUMN_INDEX + 1)
* versions) {
+ // Wait for minor compactions to happen
Thread.sleep(1000);
waitCount++;
if (waitCount > 30) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c15bd407c9..649c48844d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -977,7 +977,6 @@ public class TestUtil {
public static CellCount getCellCount(Table table, boolean isRaw) throws
IOException {
Scan s = new Scan();
s.setRaw(isRaw);
- ;
s.readAllVersions();
CellCount cellCount = new CellCount();