This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 8aa728e5be PHOENIX-7402: Fix row getting expired partially even if 
updated within TTL (#1973)
8aa728e5be is described below

commit 8aa728e5be99f9a89c6000219888ff947dfb9f53
Author: sanjeet006py <[email protected]>
AuthorDate: Wed Sep 25 00:45:38 2024 +0530

    PHOENIX-7402: Fix row getting expired partially even if updated within TTL 
(#1973)
    
    Co-authored-by: Sanjeet Malhotra <[email protected]>
---
 .../phoenix/coprocessor/CompactionScanner.java     | 145 +++++++++++++++------
 .../UngroupedAggregateRegionObserver.java          |  52 +-------
 .../phoenix/end2end/MaxLookbackExtendedIT.java     |  54 +++++++-
 .../org/apache/phoenix/end2end/TableTTLIT.java     |  33 ++---
 .../java/org/apache/phoenix/util/TestUtil.java     |  21 +++
 5 files changed, 195 insertions(+), 110 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 70b57fb213..e96406f142 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -67,6 +67,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PSmallint;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -155,6 +156,9 @@ public class CompactionScanner implements InternalScanner {
     private long outputCellCount = 0;
     private boolean phoenixLevelOnly = false;
 
+    // Only for forcing minor compaction while testing
+    private static boolean forceMinorCompaction = false;
+
     public CompactionScanner(RegionCoprocessorEnvironment env,
             Store store,
             InternalScanner storeScanner,
@@ -166,13 +170,16 @@ public class CompactionScanner implements InternalScanner 
{
         this.region = env.getRegion();
         this.store = store;
         this.env = env;
-        this.emptyCF = major && table != null ? 
SchemaUtil.getEmptyColumnFamily(table) : EMPTY_BYTE_ARRAY;
-        this.emptyCQ = major && table != null ? 
SchemaUtil.getEmptyColumnQualifier(table) : EMPTY_BYTE_ARRAY;
+        // Empty column family and qualifier are always needed to compute 
which all empty cells to retain
+        // even during minor compactions. If required empty cells are not 
retained during
+        // minor compactions then we can run into the risk of partial row 
expiry on next major compaction.
+        this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+        this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table);
         compactionTime = EnvironmentEdgeManager.currentTimeMillis();
         columnFamilyName = store.getColumnFamilyName();
         storeColumnFamily = columnFamilyName.getBytes();
         tableName = region.getRegionInfo().getTable().getNameAsString();
-        String dataTableName = major && table != null ? 
table.getName().toString() : "";
+        String dataTableName = table.getName().toString();
         Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR 
+ columnFamilyName);
         this.maxLookbackInMillis = overriddenMaxLookback == null ?
                 maxLookbackAgeInMillis : Math.max(maxLookbackAgeInMillis, 
overriddenMaxLookback);
@@ -181,16 +188,14 @@ public class CompactionScanner implements InternalScanner 
{
         // maxLookbackInMillis + 1 so that the oldest scn does not return 
empty row
         this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ? 
compactionTime : compactionTime - (this.maxLookbackInMillis + 1);
         ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
-        this.major = major;
+        this.major = major && ! forceMinorCompaction;
         this.minVersion = cfd.getMinVersions();
         this.maxVersion = cfd.getMaxVersions();
         this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : 
cfd.getKeepDeletedCells();
         familyCount = region.getTableDescriptor().getColumnFamilies().length;
         localIndex = 
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
-        emptyCFStore = major
-                ? familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
-                        || localIndex
-                : true; // we do not need to identify emptyCFStore for minor 
compaction or flushes
+        emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
+                        || localIndex;
 
         // Initialize the tracker that computes the TTL for the compacting 
table.
         // The TTL tracker can be
@@ -198,15 +203,15 @@ public class CompactionScanner implements InternalScanner 
{
         // complex when the TTL can vary per row when the compacting table is 
Partitioned.
         TTLTracker
                 ttlTracker =
-                major ?
-                        createTTLTrackerFor(env, store, table) :
-                        new TableTTLTrackerForFlushesAndMinor(tableName) ;
+                this.major ?
+                        createTTLTrackerFor(env, store, table):
+                        new TableTTLTrackerForFlushesAndMinor(tableName);
 
         phoenixLevelRowCompactor = new PhoenixLevelRowCompactor(ttlTracker);
         hBaseLevelRowCompactor = new HBaseLevelRowCompactor(ttlTracker);
 
         LOGGER.info("Starting CompactionScanner for table " + tableName + " 
store "
-                + columnFamilyName + (major ? " major " : " not major ") + 
"compaction ttl "
+                + columnFamilyName + (this.major ? " major " : " not major ") 
+ "compaction ttl "
                 + ttlTracker.getRowContext().getTTL() + "ms " + "max lookback 
" + this.maxLookbackInMillis + "ms");
         LOGGER.info(String.format("CompactionScanner params:- (" +
                         "physical-data-tablename = %s, compaction-tablename = 
%s, region = %s, " +
@@ -214,15 +219,25 @@ public class CompactionScanner implements InternalScanner 
{
                         "emptyCF = %s, emptyCQ = %s, " +
                         "minVersion = %d, maxVersion = %d, keepDeletedCells = 
%s, " +
                         "familyCount = %d, localIndex = %s, emptyCFStore = %s, 
" +
-                        "compactionTime = %d, maxLookbackWindowStart = %d, 
maxLookbackInMillis = %d)",
+                        "compactionTime = %d, maxLookbackWindowStart = %d, 
maxLookbackInMillis = %d, major = %s)",
                 dataTableName, tableName, 
region.getRegionInfo().getEncodedName(),
                 Bytes.toStringBinary(region.getRegionInfo().getStartKey()),
                 Bytes.toStringBinary(region.getRegionInfo().getEndKey()),
                 Bytes.toString(this.emptyCF), Bytes.toString(emptyCQ),
                 this.minVersion, this.maxVersion, this.keepDeletedCells.name(),
                 this.familyCount, this.localIndex, this.emptyCFStore,
-                compactionTime, maxLookbackWindowStart, maxLookbackInMillis));
+                compactionTime, maxLookbackWindowStart, maxLookbackInMillis, 
this.major));
+
+    }
 
+    @VisibleForTesting
+    public static void setForceMinorCompaction(boolean doMinorCompaction) {
+        forceMinorCompaction = doMinorCompaction;
+    }
+
+    @VisibleForTesting
+    public static boolean getForceMinorCompaction() {
+        return forceMinorCompaction;
     }
 
     /**
@@ -392,6 +407,9 @@ public class CompactionScanner implements InternalScanner {
                 + columnFamilyName + (major ? " major " : " not major ") + 
"compaction retained "
                 + outputCellCount + " of " + inputCellCount + " cells"
                 + (phoenixLevelOnly ? " phoenix level only" : ""));
+        if (forceMinorCompaction) {
+            forceMinorCompaction = false;
+        }
         storeScanner.close();
     }
 
@@ -2179,13 +2197,10 @@ public class CompactionScanner implements 
InternalScanner {
                 List<Cell> lastRowVersion, List<Cell> retainedCells, 
List<Cell> emptyColumn) {
             Cell currentColumnCell = null;
             boolean isEmptyColumn = false;
-            Cell cellAtMaxLookbackWindowStart = null;
             for (Cell cell : result) {
-                if (cell.getTimestamp() > 
rowTracker.getRowContext().getMaxLookbackWindowStart()) {
+                long maxLookbackWindowStart = 
rowTracker.getRowContext().getMaxLookbackWindowStart();
+                if (cell.getTimestamp() > maxLookbackWindowStart) {
                     retainedCells.add(cell);
-                    if (cell.getTimestamp() == maxLookbackWindowStart) {
-                        cellAtMaxLookbackWindowStart = cell;
-                    }
                     continue;
                 }
                 if (!major && cell.getType() != Cell.Type.Put) {
@@ -2194,26 +2209,20 @@ public class CompactionScanner implements 
InternalScanner {
                 if (currentColumnCell == null ||
                         !CellUtil.matchingColumn(cell, currentColumnCell)) {
                     currentColumnCell = cell;
-                    if (cell.getType() != Cell.Type.Delete
-                            && cell.getType() != Cell.Type.DeleteColumn) {
+                    isEmptyColumn = ScanUtil.isEmptyColumn(cell, emptyCF, 
emptyCQ);
+                    if ((cell.getType() != Cell.Type.Delete
+                            && cell.getType() != Cell.Type.DeleteColumn)
+                            || cell.getTimestamp() == maxLookbackWindowStart) {
                         // Include only delete family markers and put cells
-                        // It is possible that this cell is not visible from 
the max lookback
-                        // window. This happens when there is a mutation with 
the mutation timestamp
-                        // equal to the max lookback window start timestamp. 
The following is to
-                        // check for this case
-                        if (cellAtMaxLookbackWindowStart == null
-                                || !CellUtil.matchingColumn(cell, 
cellAtMaxLookbackWindowStart)) {
-                            lastRowVersion.add(cell);
-                        }
-                    }
-                    if (major && ScanUtil.isEmptyColumn(cell, emptyCF, 
emptyCQ)) {
-                        isEmptyColumn = true;
-                    } else {
-                        isEmptyColumn = false;
+                        // The last row version can also be the cells with 
timestamp
+                        // same as timestamp of start of max lookback window
+                        lastRowVersion.add(cell);
                     }
-                } else if (major && isEmptyColumn) {
+                } else if (isEmptyColumn) {
                     // We only need to keep one cell for every column for the 
last row version.
-                    // So here we just form the empty column beyond the last 
row version
+                    // So here we just form the empty column beyond the last 
row version.
+                    // Empty column needs to be collected during minor 
compactions also
+                    // else we will see partial row expiry.
                     emptyColumn.add(cell);
                 }
             }
@@ -2223,7 +2232,7 @@ public class CompactionScanner implements InternalScanner 
{
          * Close the gap between the two timestamps, max and min, with the 
minimum number of cells
          * from the input list such that the timestamp difference between two 
cells should
          * not more than ttl. The cells that are used to close the gap are 
added to the output
-         * list.
+         * list. The input list is a list of empty cells in decreasing order 
of timestamp.
          */
         private void closeGap(long max, long min, long ttl, List<Cell> input, 
List<Cell> output) {
             int  previous = -1;
@@ -2234,7 +2243,9 @@ public class CompactionScanner implements InternalScanner 
{
                     previous++;
                     continue;
                 }
-                if (previous == -1) {
+                if (previous == -1 && max - ts > ttl) {
+                    // Means even the first empty cells in the input list 
which is closest to
+                    // max timestamp can't close the gap. So, gap can't be 
closed by empty cells at all.
                     break;
                 }
                 if (max - ts > ttl) {
@@ -2247,6 +2258,42 @@ public class CompactionScanner implements 
InternalScanner {
                 }
                 previous++;
             }
+            if (previous > -1 && max - min > ttl) {
+                // This covers the case we need to retain the last empty cell 
in the input list. The close gap
+                // algorithm is such that if we need to retain the i th empty 
cell in the input list then we
+                // will get to know that once we are iterating on i+1 th empty 
cell. So, to retain last empty cell
+                // in input list we need to check the min timestamp.
+                output.add(input.remove(previous));
+            }
+        }
+
+        /**
+         * Retains minimum empty cells needed during minor compaction to not 
loose data/partial row expiry
+         * on next major compaction.
+         * @param emptyColumn Empty column cells in decreasing order of 
timestamp.
+         * @param retainedCells Cells to be retained.
+         */
+        private void retainEmptyCellsInMinorCompaction(List<Cell> emptyColumn, 
List<Cell> retainedCells) {
+            if (emptyColumn.isEmpty()) {
+                return;
+            }
+            else if (familyCount == 1 || localIndex) {
+                // We are compacting empty column family store and its single 
column family so
+                // just need to retain empty cells till min timestamp of last 
row version. Can't
+                // minimize the retained empty cells further as we don't know 
actual TTL during
+                // minor compactions.
+                long minRowTimestamp = rowContext.minTimestamp;
+                for (Cell emptyCell: emptyColumn) {
+                    if (emptyCell.getTimestamp() > minRowTimestamp) {
+                        retainedCells.add(emptyCell);
+                    }
+                }
+                return;
+            }
+            // For multi-column family, w/o doing region level scan we can't 
put a bound on timestamp
+            // till which we should retain the empty cells. The empty cells 
can be needed to close the gap
+            // b/w empty column family cell and non-empty column family cell.
+            retainedCells.addAll(emptyColumn);
         }
 
         /**
@@ -2271,7 +2318,8 @@ public class CompactionScanner implements InternalScanner 
{
                 }
             }
 
-            if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis 
+ ttl) {
+            if (major && compactionTime - rowContext.maxTimestamp > 
maxLookbackInMillis + ttl) {
+                // Only do this check for major compaction as for minor 
compactions we don't expire cells.
                 // The row version should not be visible via the max lookback 
window. Nothing to do
                 return;
             }
@@ -2280,7 +2328,9 @@ public class CompactionScanner implements InternalScanner 
{
             // mutation will be considered expired and masked. If the length 
of the time range of
             // a row version is not more than ttl, then we know the cells 
covered by the row
             // version are not apart from each other more than ttl and will 
not be masked.
-            if (rowContext.maxTimestamp - rowContext.minTimestamp <= ttl) {
+            if (major && rowContext.maxTimestamp - rowContext.minTimestamp <= 
ttl) {
+                // Skip this check for minor compactions as we don't compute 
actual TTL for
+                // minor compactions and don't expire cells.
                 return;
             }
             // The quick time range check did not pass. We need get at least 
one empty cell to cover
@@ -2288,6 +2338,10 @@ public class CompactionScanner implements 
InternalScanner {
             if (emptyColumn.isEmpty()) {
                 return;
             }
+            else if (! major) {
+                retainEmptyCellsInMinorCompaction(emptyColumn, retainedCells);
+                return;
+            }
             int size = lastRow.size();
             long tsArray[] = new long[size];
             int i = 0;
@@ -2392,7 +2446,7 @@ public class CompactionScanner implements InternalScanner 
{
                     }
                     lastRowVersion = trimmedRow;
                     trimmedEmptyColumn.clear();;
-                    for (Cell cell : lastRowVersion) {
+                    for (Cell cell : emptyColumn) {
                         if (cell.getTimestamp() >= minTimestamp) {
                             trimmedEmptyColumn.add(cell);
                         }
@@ -2425,7 +2479,14 @@ public class CompactionScanner implements 
InternalScanner {
             }
             phoenixResult.clear();
             rowTracker.setTTL(result.get(0));
-            if (!retainCellsForMaxLookback(result, regionLevel, 
phoenixResult)) {
+            // For multi-CF case, always do region level scan for empty CF 
store during major compaction else
+            // we could end-up removing some empty cells which are needed to 
close the gap b/w empty CF cell and
+            // non-empty CF cell to prevent partial row expiry. This can 
happen when last row version of non-empty
+            // CF cell outside max lookback window is older than last row 
version of empty CF cell.
+            if (major && familyCount > 1 && ! localIndex && emptyCFStore && ! 
regionLevel) {
+                compactRegionLevel(result, phoenixResult);
+            }
+            else if (!retainCellsForMaxLookback(result, regionLevel, 
phoenixResult)) {
                 if (familyCount == 1 || regionLevel) {
                     throw new RuntimeException("UNEXPECTED");
                 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 6ae12454df..e0253b7290 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -179,25 +179,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     private Configuration indexWriteConfig;
     private ReadOnlyProps indexWriteProps;
 
-    private static Map<String, Long> maxLookbackMap = new 
ConcurrentHashMap<>();
-
-    public static void setMaxLookbackInMillis(String tableName, long 
maxLookbackInMillis) {
-        if (tableName == null) {
-            return;
-        }
-        maxLookbackMap.put(tableName,
-                maxLookbackInMillis);
-    }
-
-    public static long getMaxLookbackInMillis(String tableName, long 
maxLookbackInMillis) {
-        if (tableName == null) {
-            return maxLookbackInMillis;
-        }
-        Long value = maxLookbackMap.get(tableName);
-        return value == null
-                ? maxLookbackInMillis
-                : maxLookbackMap.get(tableName);
-    }
     @Override
     public Optional<RegionObserver> getRegionObserver() {
         return Optional.of(this);
@@ -600,30 +581,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 region.getTableDescriptor().getTableName().getName()) == 0);
     }
 
-    @Override
-    public InternalScanner 
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-            InternalScanner scanner, FlushLifeCycleTracker tracker) throws 
IOException {
-        if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
-            return scanner;
-        } else {
-            return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
-                @Override public InternalScanner run() throws Exception {
-                    String tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable()
-                            .getNameAsString();
-                    long maxLookbackInMillis =
-                            
UngroupedAggregateRegionObserver.getMaxLookbackInMillis(
-                                    tableName,
-                                    
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(
-                                            
c.getEnvironment().getConfiguration()));
-                    maxLookbackInMillis = 
CompactionScanner.getMaxLookbackInMillis(tableName,
-                            store.getColumnFamilyName(), maxLookbackInMillis);
-                    return new CompactionScanner(c.getEnvironment(), store, 
scanner,
-                            maxLookbackInMillis, false, true, null);
-                }
-            });
-        }
-    }
-
     @Override
     public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
                                       InternalScanner scanner, ScanType 
scanType, CompactionLifeCycleTracker tracker,
@@ -653,11 +610,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                         compactionConfig).unwrap(PhoenixConnection.class)) {
                     table = conn.getTableNoCache(fullTableName);
                     maxLookbackAge = table.getMaxLookbackAge();
-                    UngroupedAggregateRegionObserver.setMaxLookbackInMillis(
-                            tableName.getNameAsString(),
-                            MetaDataUtil.getMaxLookbackAge(
-                                    c.getEnvironment().getConfiguration(),
-                                    maxLookbackAge));
                 } catch (Exception e) {
                     if (e instanceof TableNotFoundException) {
                         LOGGER.debug("Ignoring HBase table that is not a 
Phoenix table: "
@@ -705,6 +657,10 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     keepDeleted, table
                             );
                 }
+                else if 
(isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
+                    LOGGER.warn("Skipping compaction for table: {} " +
+                            "as failed to retrieve PTable object", 
fullTableName);
+                }
                 if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
                     try {
                         long clientTimeStamp = 
EnvironmentEdgeManager.currentTimeMillis();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index 1e2fa2752c..843f780e34 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -65,6 +65,7 @@ import static 
org.apache.phoenix.util.TestUtil.assertTableHasTtl;
 import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @Category(NeedsOwnMiniClusterTest.class)
@@ -91,6 +92,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
         
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
         
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(MAX_LOOKBACK_AGE));
         props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
+        props.put(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, "0");
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
@@ -615,6 +617,57 @@ public class MaxLookbackExtendedIT extends BaseTest {
             }
         }
     }
+
+    @Test(timeout=60000)
+    public void testRetainingLastRowVersion() throws Exception {
+        if(hasTableLevelMaxLookback) {
+            optionBuilder.append(", MAX_LOOKBACK_AGE=" + 
TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+            tableDDLOptions = optionBuilder.toString();
+        }
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = generateUniqueName();
+            createTable(tableName);
+            long timeIntervalBetweenTwoUpserts = (ttl / 2) + 1;
+            injectEdge.setValue(System.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            TableName dataTableName = TableName.valueOf(tableName);
+            injectEdge.incrementValue(1);
+            Statement stmt = conn.createStatement();
+            stmt.execute("upsert into " + tableName + " values ('a', 'ab', 
'abc', 'abcd')");
+            conn.commit();
+            injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+            flush(dataTableName);
+            injectEdge.incrementValue(1);
+            stmt.execute("upsert into " + tableName + " values ('a', 'ab1')");
+            conn.commit();
+            injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+            flush(dataTableName);
+            injectEdge.incrementValue(1);
+            stmt.execute("upsert into " + tableName + " values ('a', 'ab2')");
+            conn.commit();
+            injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+            flush(dataTableName);
+            injectEdge.incrementValue(1);
+            stmt.execute("upsert into " + tableName + " values ('a', 'ab3')");
+            conn.commit();
+            injectEdge.incrementValue(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+            flush(dataTableName);
+            injectEdge.incrementValue(1);
+            TestUtil.dumpTable(conn, dataTableName);
+            TestUtil.minorCompact(utility, dataTableName);
+            injectEdge.incrementValue(1);
+            TestUtil.dumpTable(conn, dataTableName);
+            majorCompact(dataTableName);
+            injectEdge.incrementValue(1);
+            TestUtil.dumpTable(conn, dataTableName);
+            ResultSet rs = stmt.executeQuery("select * from " + dataTableName 
+ " where id = 'a'");
+            while(rs.next()) {
+                assertNotNull(rs.getString(3));
+                assertNotNull(rs.getString(4));
+            }
+        }
+    }
+
     private void flush(TableName table) throws IOException {
         Admin admin = getUtility().getAdmin();
         admin.flush(table);
@@ -684,5 +737,4 @@ public class MaxLookbackExtendedIT extends BaseTest {
         String actualExplainPlan = QueryUtil.getExplainPlan(rs);
         IndexToolIT.assertExplainPlan(false, actualExplainPlan, 
dataTableFullName, indexTableFullName);
     }
-
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index e2aa329f3d..f7188e8006 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -52,6 +52,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 @Category(NeedsOwnMiniClusterTest.class)
@@ -172,6 +173,7 @@ public class TableTTLIT extends BaseTest {
             conn.commit();
             String noCompactTableName = generateUniqueName();
             createTable(noCompactTableName);
+            conn.createStatement().execute("ALTER TABLE " + noCompactTableName 
+ " set MAX_LOOKBACK_AGE = " + maxLookbackAge * 1000);
             conn.commit();
             long startTime = System.currentTimeMillis() + 1000;
             startTime = (startTime / 1000) * 1000;
@@ -245,7 +247,7 @@ public class TableTTLIT extends BaseTest {
     }
 
     @Test
-    public void 
testFlushesAndMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
+    public void 
testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
             throws Exception {
         final int maxLookbackAge = tableLevelMaxLooback != null
                 ? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
@@ -255,36 +257,29 @@ public class TableTTLIT extends BaseTest {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String tableName = generateUniqueName();
             createTable(tableName);
-            conn.createStatement().execute("Alter Table " + tableName + " set 
\"phoenix.max.lookback.age.seconds\" = 0");
             conn.commit();
             final int flushCount = 10;
             byte[] row = Bytes.toBytes("a");
+            int rowUpdateCounter = 0;
             for (int i = 0; i < flushCount; i++) {
                 // Generate more row versions than the maximum cell versions 
for the table
                 int updateCount = RAND.nextInt(10) + versions;
+                rowUpdateCounter += updateCount;
                 for (int j = 0; j < updateCount; j++) {
                     updateRow(conn, tableName, "a");
                 }
                 flush(TableName.valueOf(tableName));
-                // At every flush, extra cell versions should be removed.
-                // MAX_COLUMN_INDEX table columns and one empty column will be 
retained for
-                // each row version.
-                assertTrue(TestUtil.getRawCellCount(conn, 
TableName.valueOf(tableName), row)
-                        <= (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
+                // Flushes dump and retain all the cells to HFile.
+                // Doing MAX_COLUMN_INDEX + 1 to account for empty cells
+                assertEquals(TestUtil.getRawCellCount(conn, 
TableName.valueOf(tableName), row),
+                        rowUpdateCounter * (MAX_COLUMN_INDEX + 1));
             }
+            TestUtil.dumpTable(conn, TableName.valueOf(tableName));
             // Run one minor compaction (in case no minor compaction has 
happened yet)
-            Admin admin = utility.getAdmin();
-            admin.compact(TableName.valueOf(tableName));
-            int waitCount = 0;
-            while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
-                    Bytes.toBytes("a")) >= flushCount * (MAX_COLUMN_INDEX + 1) 
* versions) {
-                // Wait for minor compactions to happen
-                Thread.sleep(1000);
-                waitCount++;
-                if (waitCount > 30) {
-                    Assert.fail();
-                }
-            }
+            TestUtil.minorCompact(utility, TableName.valueOf(tableName));
+            TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+            assertEquals(TestUtil.getRawCellCount(conn, 
TableName.valueOf(tableName), Bytes.toBytes("a")),
+                    (MAX_COLUMN_INDEX + 1) * versions);
         }
     }
 
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c071129dcb..1845f01a62 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -98,6 +98,7 @@ import org.apache.phoenix.compile.StatementNormalizer;
 import org.apache.phoenix.compile.SubqueryRewriter;
 import org.apache.phoenix.compile.SubselectRewriter;
 import org.apache.phoenix.compile.JoinCompiler.JoinTable;
+import org.apache.phoenix.coprocessor.CompactionScanner;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
@@ -815,6 +816,26 @@ public class TestUtil {
         admin.flush(table);
     }
 
+    public static void minorCompact(HBaseTestingUtility utility, TableName 
table)
+            throws IOException, InterruptedException {
+        try {
+            CompactionScanner.setForceMinorCompaction(true);
+            Admin admin = utility.getAdmin();
+            admin.compact(table);
+            int waitForCompactionToCompleteCounter = 0;
+            while (CompactionScanner.getForceMinorCompaction()) {
+                waitForCompactionToCompleteCounter++;
+                if (waitForCompactionToCompleteCounter > 20) {
+                    Assert.fail();
+                }
+                Thread.sleep(1000);
+            }
+        }
+        finally {
+            CompactionScanner.setForceMinorCompaction(false);
+        }
+    }
+
     public static void majorCompact(HBaseTestingUtility utility, TableName 
table)
         throws IOException, InterruptedException {
         long compactionRequestedSCN = 
EnvironmentEdgeManager.currentTimeMillis();

Reply via email to