This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.2 by this push:
     new 52d58f9891 PHOENIX-7402: Fix row getting expired partially even if 
updated within TTL (#1986)
52d58f9891 is described below

commit 52d58f9891c319347571fa1e5f834692d68cf7e2
Author: sanjeet006py <[email protected]>
AuthorDate: Thu Sep 26 03:21:46 2024 +0530

    PHOENIX-7402: Fix row getting expired partially even if updated within TTL 
(#1986)
    
    * PHOENIX-7402: Fix row getting expired partially even if updated within TTL
    
    * Increase thread sleep while waiting for minor compaction to complete
    
    ---------
    
    Co-authored-by: Sanjeet Malhotra <[email protected]>
---
 .../phoenix/coprocessor/CompactionScanner.java     | 131 +++++++++++++++------
 .../UngroupedAggregateRegionObserver.java          |  25 +---
 .../phoenix/end2end/MaxLookbackExtendedIT.java     |  56 +++++++++
 .../org/apache/phoenix/end2end/TableTTLIT.java     |  38 +++---
 .../java/org/apache/phoenix/util/TestUtil.java     |  21 ++++
 5 files changed, 196 insertions(+), 75 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 2fcf91dc64..64bebca166 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 import static 
org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
 
@@ -103,6 +104,9 @@ public class CompactionScanner implements InternalScanner {
     private long outputCellCount = 0;
     private boolean phoenixLevelOnly = false;
 
+    // Only for forcing minor compaction while testing
+    private static boolean forceMinorCompaction = false;
+
     public CompactionScanner(RegionCoprocessorEnvironment env,
             Store store,
             InternalScanner storeScanner,
@@ -115,6 +119,9 @@ public class CompactionScanner implements InternalScanner {
         this.region = env.getRegion();
         this.store = store;
         this.env = env;
+        // Empty column family and qualifier are always needed to compute 
which all empty cells to retain
+        // even during minor compactions. If required empty cells are not 
retained during
+        // minor compactions then we can run into the risk of partial row 
expiry on next major compaction.
         this.emptyCF = emptyCF;
         this.emptyCQ = emptyCQ;
         this.config = env.getConfiguration();
@@ -131,8 +138,8 @@ public class CompactionScanner implements InternalScanner {
         this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ?
                 compactionTime : compactionTime - (this.maxLookbackInMillis + 
1);
         ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
-        this.major = major;
-        int ttl = major ? cfd.getTimeToLive() : HConstants.FOREVER;
+        this.major = major && ! forceMinorCompaction;
+        int ttl = this.major ? cfd.getTimeToLive() : HConstants.FOREVER;
         ttlInMillis = ((long) ttl) * 1000;
         this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttlInMillis;
         this.maxLookbackWindowStart = Math.max(ttlWindowStart, 
maxLookbackWindowStart);
@@ -141,17 +148,25 @@ public class CompactionScanner implements InternalScanner 
{
         this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : 
cfd.getKeepDeletedCells();
         familyCount = region.getTableDescriptor().getColumnFamilies().length;
         localIndex = 
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
-        emptyCFStore = major
-                ? familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
-                        || localIndex
-                : true; // we do not need to identify emptyCFStore for minor 
compaction or flushes
+        emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
+                        || localIndex;
         phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
         hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
         LOGGER.info("Starting CompactionScanner for table " + tableName + " 
store "
-                + columnFamilyName + (major ? " major " : " not major ") + 
"compaction ttl "
+                + columnFamilyName + (this.major ? " major " : " not major ") 
+ "compaction ttl "
                 + ttlInMillis + "ms " + "max lookback " + 
this.maxLookbackInMillis + "ms");
     }
 
+    @VisibleForTesting
+    public static void setForceMinorCompaction(boolean doMinorCompaction) {
+        forceMinorCompaction = doMinorCompaction;
+    }
+
+    @VisibleForTesting
+    public static boolean getForceMinorCompaction() {
+        return forceMinorCompaction;
+    }
+
     /**
      * Any coprocessors within a JVM can extend the max lookback window for a 
column family
      * by calling this static method.
@@ -243,6 +258,9 @@ public class CompactionScanner implements InternalScanner {
                 + columnFamilyName + (major ? " major " : " not major ") + 
"compaction retained "
                 + outputCellCount + " of " + inputCellCount + " cells"
                 + (phoenixLevelOnly ? " phoenix level only" : ""));
+        if (forceMinorCompaction) {
+            forceMinorCompaction = false;
+        }
         storeScanner.close();
     }
 
@@ -674,13 +692,9 @@ public class CompactionScanner implements InternalScanner {
                 List<Cell> lastRowVersion, List<Cell> retainedCells, 
List<Cell> emptyColumn) {
             Cell currentColumnCell = null;
             boolean isEmptyColumn = false;
-            Cell cellAtMaxLookbackWindowStart = null;
             for (Cell cell : result) {
-                if (cell.getTimestamp() >= maxLookbackWindowStart) {
+                if (cell.getTimestamp() > maxLookbackWindowStart) {
                     retainedCells.add(cell);
-                    if (cell.getTimestamp() == maxLookbackWindowStart) {
-                        cellAtMaxLookbackWindowStart = cell;
-                    }
                     continue;
                 }
                 if (!major && cell.getType() != Cell.Type.Put) {
@@ -689,26 +703,19 @@ public class CompactionScanner implements InternalScanner 
{
                 if (currentColumnCell == null ||
                         !CellUtil.matchingColumn(cell, currentColumnCell)) {
                     currentColumnCell = cell;
-                    if (cell.getType() != Cell.Type.Delete
-                            && cell.getType() != Cell.Type.DeleteColumn) {
-                        // Include only delete family markers and put cells
-                        // It is possible that this cell is not visible from 
the max lookback
-                        // window. This happens when there is a mutation with 
the mutation timestamp
-                        // equal to the max lookback window start timestamp. 
The following is to
-                        // check for this case
-                        if (cellAtMaxLookbackWindowStart == null
-                                || !CellUtil.matchingColumn(cell, 
cellAtMaxLookbackWindowStart)) {
-                            lastRowVersion.add(cell);
-                        }
-                    }
-                    if (major && ScanUtil.isEmptyColumn(cell, emptyCF, 
emptyCQ)) {
-                        isEmptyColumn = true;
-                    } else {
-                        isEmptyColumn = false;
+                    isEmptyColumn = ScanUtil.isEmptyColumn(cell, emptyCF, 
emptyCQ);
+                    if ((cell.getType() != Cell.Type.Delete
+                            && cell.getType() != Cell.Type.DeleteColumn)
+                            || cell.getTimestamp() == maxLookbackWindowStart) {
+                        // Include only delete family markers and put cells or 
the
+                        // cells at start edge of max lookback window
+                        lastRowVersion.add(cell);
                     }
-                } else if (major && isEmptyColumn) {
+                } else if (isEmptyColumn) {
                     // We only need to keep one cell for every column for the 
last row version.
-                    // So here we just form the empty column beyond the last 
row version
+                    // So here we just form the empty column beyond the last 
row version.
+                    // Empty column needs to be collected during minor 
compactions also
+                    // else we will see partial row expiry.
                     emptyColumn.add(cell);
                 }
             }
@@ -718,7 +725,7 @@ public class CompactionScanner implements InternalScanner {
          * Close the gap between the two timestamps, max and min, with the 
minimum number of cells
          * from the input list such that the timestamp difference between two 
cells should
          * not more than ttl. The cells that are used to close the gap are 
added to the output
-         * list.
+         * list. The input list is a list of empty cells in decreasing order 
of timestamp.
          */
         private void closeGap(long max, long min, List<Cell> input, List<Cell> 
output) {
             int  previous = -1;
@@ -729,7 +736,9 @@ public class CompactionScanner implements InternalScanner {
                     previous++;
                     continue;
                 }
-                if (previous == -1) {
+                if (previous == -1 && max - ts > ttlInMillis) {
+                    // Means even the first empty cells in the input list 
which is closest to
+                    // max timestamp can't close the gap. So, gap can't be 
closed by empty cells at all.
                     break;
                 }
                 if (max - ts > ttlInMillis) {
@@ -742,6 +751,42 @@ public class CompactionScanner implements InternalScanner {
                 }
                 previous++;
             }
+            if (previous > -1 && max - min > ttlInMillis) {
+                // This covers the case we need to retain the last empty cell 
in the input list. The close gap
+                // algorithm is such that if we need to retain the i th empty 
cell in the input list then we
+                // will get to know that once we are iterating on i+1 th empty 
cell. So, to retain last empty cell
+                // in input list we need to check the min timestamp.
+                output.add(input.remove(previous));
+            }
+        }
+
+        /**
+         * Retains minimum empty cells needed during minor compaction to not 
loose data/partial row expiry
+         * on next major compaction.
+         * @param emptyColumn Empty column cells in decreasing order of 
timestamp.
+         * @param retainedCells Cells to be retained.
+         */
+        private void retainEmptyCellsInMinorCompaction(List<Cell> emptyColumn, 
List<Cell> retainedCells) {
+            if (emptyColumn.isEmpty()) {
+                return;
+            }
+            else if (familyCount == 1 || localIndex) {
+                // We are compacting empty column family store and its single 
column family so
+                // just need to retain empty cells till min timestamp of last 
row version. Can't
+                // minimize the retained empty cells further as we don't know 
actual TTL during
+                // minor compactions.
+                long minRowTimestamp = rowContext.minTimestamp;
+                for (Cell emptyCell: emptyColumn) {
+                    if (emptyCell.getTimestamp() > minRowTimestamp) {
+                        retainedCells.add(emptyCell);
+                    }
+                }
+                return;
+            }
+            // For multi-column family, w/o doing region level scan we can't 
put a bound on timestamp
+            // till which we should retain the empty cells. The empty cells 
can be needed to close the gap
+            // b/w empty column family cell and non-empty column family cell.
+            retainedCells.addAll(emptyColumn);
         }
 
         /**
@@ -764,7 +809,8 @@ public class CompactionScanner implements InternalScanner {
                 }
             }
 
-            if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis 
+ ttlInMillis) {
+            if (major && compactionTime - rowContext.maxTimestamp > 
maxLookbackInMillis + ttlInMillis) {
+                // Only do this check for major compaction as for minor 
compactions we don't expire cells.
                 // The row version should not be visible via the max lookback 
window. Nothing to do
                 return;
             }
@@ -773,7 +819,9 @@ public class CompactionScanner implements InternalScanner {
             // mutation will be considered expired and masked. If the length 
of the time range of
             // a row version is not more than ttl, then we know the cells 
covered by the row
             // version are not apart from each other more than ttl and will 
not be masked.
-            if (rowContext.maxTimestamp - rowContext.minTimestamp <= 
ttlInMillis) {
+            if (major && rowContext.maxTimestamp - rowContext.minTimestamp <= 
ttlInMillis) {
+                // Skip this check for minor compactions as we don't compute 
actual TTL for
+                // minor compactions and don't expire cells.
                 return;
             }
             // The quick time range check did not pass. We need get at least 
one empty cell to cover
@@ -781,6 +829,10 @@ public class CompactionScanner implements InternalScanner {
             if (emptyColumn.isEmpty()) {
                 return;
             }
+            else if (! major) {
+                retainEmptyCellsInMinorCompaction(emptyColumn, retainedCells);
+                return;
+            }
             int size = lastRow.size();
             long tsArray[] = new long[size];
             int i = 0;
@@ -884,7 +936,7 @@ public class CompactionScanner implements InternalScanner {
                     }
                     lastRowVersion = trimmedRow;
                     trimmedEmptyColumn.clear();;
-                    for (Cell cell : lastRowVersion) {
+                    for (Cell cell : emptyColumn) {
                         if (cell.getTimestamp() >= minTimestamp) {
                             trimmedEmptyColumn.add(cell);
                         }
@@ -916,7 +968,14 @@ public class CompactionScanner implements InternalScanner {
                 return;
             }
             phoenixResult.clear();
-            if (!retainCellsForMaxLookback(result, regionLevel, 
phoenixResult)) {
+            // For multi-CF case, always do region level scan for empty CF 
store during major compaction else
+            // we could end-up removing some empty cells which are needed to 
close the gap b/w empty CF cell and
+            // non-empty CF cell to prevent partial row expiry. This can 
happen when last row version of non-empty
+            // CF cell outside max lookback window is older than last row 
version of empty CF cell.
+            if (major && familyCount > 1 && ! localIndex && emptyCFStore && ! 
regionLevel) {
+                compactRegionLevel(result, phoenixResult);
+            }
+            else if (!retainCellsForMaxLookback(result, regionLevel, 
phoenixResult)) {
                 if (familyCount == 1 || regionLevel) {
                     throw new RuntimeException("UNEXPECTED");
                 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index eda44eb00a..b0b1d4196d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -581,27 +581,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 region.getTableDescriptor().getTableName().getName()) == 0);
     }
 
-    @Override
-    public InternalScanner 
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-            InternalScanner scanner, FlushLifeCycleTracker tracker) throws 
IOException {
-        if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
-            return scanner;
-        } else {
-            return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
-                @Override public InternalScanner run() throws Exception {
-                    String tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable()
-                            .getNameAsString();
-                    long maxLookbackInMillis = 
BaseScannerRegionObserverConstants
-                            
.getMaxLookbackInMillis(c.getEnvironment().getConfiguration());
-                    maxLookbackInMillis = 
CompactionScanner.getMaxLookbackInMillis(tableName,
-                            store.getColumnFamilyName(), maxLookbackInMillis);
-                    return new CompactionScanner(c.getEnvironment(), store, 
scanner,
-                            maxLookbackInMillis, null, null, false, true);
-                }
-            });
-        }
-    }
-
     @Override
     public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
                                       InternalScanner scanner, ScanType 
scanType, CompactionLifeCycleTracker tracker,
@@ -679,6 +658,10 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     request.isMajor() || request.isAllFiles(), 
keepDeleted
                                     );
                 }
+                else if 
(isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
+                    LOGGER.warn("Skipping compaction for table: {} " +
+                            "as failed to retrieve PTable object", 
fullTableName);
+                }
                 if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
                     try {
                         long clientTimeStamp = 
EnvironmentEdgeManager.currentTimeMillis();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index 8ade4e560e..b9984bc970 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -66,6 +66,7 @@ import static 
org.apache.phoenix.util.TestUtil.assertTableHasVersions;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
@@ -88,6 +89,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
         
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
         
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(MAX_LOOKBACK_AGE));
         props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
+        props.put(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, "0");
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
@@ -560,6 +562,60 @@ public class MaxLookbackExtendedIT extends BaseTest {
             }
         }
     }
+
+    @Test(timeout=60000)
+    public void testRetainingLastRowVersion() throws Exception {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = generateUniqueName();
+            createTable(tableName);
+            long timeIntervalBetweenTwoUpserts = (ttl / 4) + 1;
+            injectEdge.setValue(System.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            TableName dataTableName = TableName.valueOf(tableName);
+            injectEdge.incrementValue(1);
+            Statement stmt = conn.createStatement();
+            stmt.execute("upsert into " + tableName + " values ('a', 'ab', 
'abc', 'abcd')");
+            conn.commit();
+            injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+            flush(dataTableName);
+            injectEdge.incrementValue(1);
+            stmt.execute("upsert into " + tableName + " values ('a', 'ab1')");
+            conn.commit();
+            injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+            flush(dataTableName);
+            injectEdge.incrementValue(1);
+            stmt.execute("upsert into " + tableName + " values ('a', 'ab2')");
+            conn.commit();
+            injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+            flush(dataTableName);
+            injectEdge.incrementValue(1);
+            TestUtil.minorCompact(utility, dataTableName);
+            injectEdge.incrementValue(1);
+            stmt.execute("upsert into " + tableName + " values ('a', 'ab3')");
+            conn.commit();
+            injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
+            flush(dataTableName);
+            injectEdge.incrementValue(1);
+            stmt.execute("upsert into " + tableName + " values ('a', 'ab4')");
+            conn.commit();
+            injectEdge.incrementValue(1);
+            flush(dataTableName);
+            TestUtil.dumpTable(conn, dataTableName);
+            injectEdge.incrementValue(1);
+            TestUtil.minorCompact(utility, dataTableName);
+            injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 - 1);
+            TestUtil.dumpTable(conn, dataTableName);
+            majorCompact(dataTableName);
+            injectEdge.incrementValue(1);
+            TestUtil.dumpTable(conn, dataTableName);
+            ResultSet rs = stmt.executeQuery("select * from " + dataTableName 
+ " where id = 'a'");
+            while(rs.next()) {
+                assertNotNull(rs.getString(3));
+                assertNotNull(rs.getString(4));
+            }
+        }
+    }
+
     private void flush(TableName table) throws IOException {
         Admin admin = getUtility().getAdmin();
         admin.flush(table);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 43a8ab1a60..11819772fe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -53,6 +53,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 @Category(NeedsOwnMiniClusterTest.class)
@@ -236,7 +237,7 @@ public class TableTTLIT extends BaseTest {
     }
 
     @Test
-    public void 
testFlushesAndMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
+    public void 
testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
             throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String tableName = generateUniqueName();
@@ -245,32 +246,33 @@ public class TableTTLIT extends BaseTest {
             conn.commit();
             final int flushCount = 10;
             byte[] row = Bytes.toBytes("a");
+            int rowUpdateCounter = 0;
+            injectEdge.setValue(System.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            injectEdge.incrementValue(1);
             for (int i = 0; i < flushCount; i++) {
                 // Generate more row versions than the maximum cell versions 
for the table
                 int updateCount = RAND.nextInt(10) + versions;
+                rowUpdateCounter += updateCount;
                 for (int j = 0; j < updateCount; j++) {
                     updateRow(conn, tableName, "a");
+                    injectEdge.incrementValue(1);
                 }
                 flush(TableName.valueOf(tableName));
-                // At every flush, extra cell versions should be removed.
-                // MAX_COLUMN_INDEX table columns and one empty column will be 
retained for
-                // each row version.
-                assertTrue(TestUtil.getRawCellCount(conn, 
TableName.valueOf(tableName), row)
-                        <= (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
+                injectEdge.incrementValue(1);
+                // Flushes dump and retain all the cells to HFile.
+                // Doing MAX_COLUMN_INDEX + 1 to account for empty cells
+                assertEquals(TestUtil.getRawCellCount(conn, 
TableName.valueOf(tableName), row),
+                        rowUpdateCounter * (MAX_COLUMN_INDEX + 1));
             }
+            TestUtil.dumpTable(conn, TableName.valueOf(tableName));
             // Run one minor compaction (in case no minor compaction has 
happened yet)
-            Admin admin = utility.getAdmin();
-            admin.compact(TableName.valueOf(tableName));
-            int waitCount = 0;
-            while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
-                    Bytes.toBytes("a")) >= flushCount * (MAX_COLUMN_INDEX + 1) 
* versions) {
-                // Wait for minor compactions to happen
-                Thread.sleep(1000);
-                waitCount++;
-                if (waitCount > 120) {
-                    Assert.fail();
-                }
-            }
+            TestUtil.minorCompact(utility, TableName.valueOf(tableName));
+            injectEdge.incrementValue(1);
+            TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+            // For multi-CF table we retain all empty cells during minor 
compaction
+            int retainedCellCount = (MAX_COLUMN_INDEX + 1) * versions + 
(multiCF ? rowUpdateCounter - versions : 0);
+            assertEquals(retainedCellCount, TestUtil.getRawCellCount(conn, 
TableName.valueOf(tableName), Bytes.toBytes("a")));
         }
     }
 
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 2e5cc1faf3..8bea1194eb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -98,6 +98,7 @@ import org.apache.phoenix.compile.StatementNormalizer;
 import org.apache.phoenix.compile.SubqueryRewriter;
 import org.apache.phoenix.compile.SubselectRewriter;
 import org.apache.phoenix.compile.JoinCompiler.JoinTable;
+import org.apache.phoenix.coprocessor.CompactionScanner;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
@@ -815,6 +816,26 @@ public class TestUtil {
         admin.flush(table);
     }
 
+    public static void minorCompact(HBaseTestingUtility utility, TableName 
table)
+            throws IOException, InterruptedException {
+        try {
+            CompactionScanner.setForceMinorCompaction(true);
+            Admin admin = utility.getAdmin();
+            admin.compact(table);
+            int waitForCompactionToCompleteCounter = 0;
+            while (CompactionScanner.getForceMinorCompaction()) {
+                waitForCompactionToCompleteCounter++;
+                if (waitForCompactionToCompleteCounter > 50) {
+                    Assert.fail();
+                }
+                Thread.sleep(3000);
+            }
+        }
+        finally {
+            CompactionScanner.setForceMinorCompaction(false);
+        }
+    }
+
     public static void majorCompact(HBaseTestingUtility utility, TableName 
table)
         throws IOException, InterruptedException {
         long compactionRequestedSCN = 
EnvironmentEdgeManager.currentTimeMillis();

Reply via email to