This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 08d924787c PHOENIX-7382 Eliminating index building and treating max 
lookback as … (#1964)
08d924787c is described below

commit 08d924787cf2aabb58a829f55ef4466ac4a0008d
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Thu Sep 26 11:28:21 2024 -0700

    PHOENIX-7382 Eliminating index building and treating max lookback as … 
(#1964)
---
 .../apache/phoenix/optimize/QueryOptimizer.java    |   6 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |  44 ++-
 .../main/java/org/apache/phoenix/util/CDCUtil.java |   2 +-
 .../java/org/apache/phoenix/util/QueryUtil.java    |   2 +-
 .../phoenix/coprocessor/CompactionScanner.java     |  19 ++
 .../coprocessor/GlobalIndexRegionScanner.java      |  29 +-
 .../coprocessor/IndexRebuildRegionScanner.java     |   4 +
 .../java/org/apache/phoenix/end2end/CDCBaseIT.java |  41 +--
 .../apache/phoenix/end2end/CDCDefinitionIT.java    |  10 +-
 .../org/apache/phoenix/end2end/CDCQueryIT.java     | 328 ++++++++++++++-------
 .../java/org/apache/phoenix/util/TestUtil.java     |  22 +-
 11 files changed, 353 insertions(+), 154 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 107eccac51..8f6faa653b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -294,6 +294,12 @@ public class QueryOptimizer {
         }
         
         for (PTable index : indexes) {
+            if (CDCUtil.isCDCIndex(index) && !forCDC) {
+                // A CDC index is allowed only for the queries on its CDC 
table because a CDC index
+                // may not be built completely and may not include the index 
row updates for
+                // the data table mutations outside the max lookback window
+                continue;
+            }
             QueryPlan plan = addPlan(statement, translatedIndexSelect, index, 
targetColumns,
                     parallelIteratorFactory, dataPlan, false, indexResolver);
             if (plan != null &&
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index dda82437ba..166caa6a66 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1075,7 +1075,8 @@ public class MetaDataClient {
         TableName tableName = statement.getTableName();
         Map<String,Object> tableProps = 
Maps.newHashMapWithExpectedSize(statement.getProps().size());
         Map<String,Object> commonFamilyProps = 
Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
-        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps, statement.getTableType());
+        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps,
+                statement.getTableType(), false);
 
         splits = processSplits(tableProps, splits);
         boolean isAppendOnlySchema = false;
@@ -1231,13 +1232,14 @@ public class MetaDataClient {
      * @throws SQLException
      */
     private void populatePropertyMaps(ListMultimap<String,Pair<String,Object>> 
statementProps, Map<String, Object> tableProps,
-            Map<String, Object> commonFamilyProps, PTableType tableType) 
throws SQLException {
+            Map<String, Object> commonFamilyProps, PTableType tableType, 
boolean isCDCIndex) throws SQLException {
         // Somewhat hacky way of determining if property is for 
HColumnDescriptor or HTableDescriptor
         ColumnFamilyDescriptor defaultDescriptor = 
ColumnFamilyDescriptorBuilder.of(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
         if (!statementProps.isEmpty()) {
             Collection<Pair<String,Object>> propsList = 
statementProps.get(QueryConstants.ALL_FAMILY_PROPERTIES_KEY);
             for (Pair<String,Object> prop : propsList) {
-                if (tableType == PTableType.INDEX && 
MetaDataUtil.propertyNotAllowedToBeOutOfSync(prop.getFirst())) {
+                if (tableType == PTableType.INDEX && !isCDCIndex &&
+                        
MetaDataUtil.propertyNotAllowedToBeOutOfSync(prop.getFirst())) {
                     throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX)
                             .setMessage("Property: " + prop.getFirst()).build()
                             .buildException();
@@ -1634,7 +1636,9 @@ public class MetaDataClient {
 
         Map<String,Object> tableProps = 
Maps.newHashMapWithExpectedSize(statement.getProps().size());
         Map<String,Object> commonFamilyProps = 
Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
-        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps, PTableType.INDEX);
+        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps, PTableType.INDEX,
+                CDCUtil.isCDCIndex(SchemaUtil
+                        
.getTableNameFromFullName(statement.getIndexTableName().toString())));
         List<Pair<ParseNode, SortOrder>> indexParseNodeAndSortOrderList = 
ik.getParseNodeAndSortOrderList();
         List<ColumnName> includedColumns = statement.getIncludeColumns();
         TableRef tableRef = null;
@@ -1963,16 +1967,23 @@ public class MetaDataClient {
                 statement.getProps().size());
         Map<String, Object> commonFamilyProps = 
Maps.newHashMapWithExpectedSize(
                 statement.getProps().size() + 1);
-        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps, PTableType.CDC);
-
-        PhoenixStatement pstmt = new PhoenixStatement(connection);
-        String dataTableFullName = 
SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
-                statement.getDataTable().getTableName());
-        String createIndexSql = "CREATE UNCOVERED INDEX " +
-                (statement.isIfNotExists() ? "IF NOT EXISTS " : "") +
-                "\"" + 
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName()) + "\"" +
-                " ON " + dataTableFullName + " (" + 
PhoenixRowTimestampFunction.NAME + "()) ASYNC";
+        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps, PTableType.CDC,
+                false);
+        Properties props = connection.getClientInfo();
+        props.put(INDEX_CREATE_DEFAULT_STATE, "ACTIVE");
+
+        String
+                dataTableFullName =
+                
SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
+                        statement.getDataTable().getTableName());
+        String
+                createIndexSql =
+                "CREATE UNCOVERED INDEX " + (statement.isIfNotExists() ? "IF 
NOT EXISTS " : "")
+                        + 
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName())
+                        + " ON " + dataTableFullName + " ("
+                        + PhoenixRowTimestampFunction.NAME + "()) ASYNC";
         List<String> indexProps = new ArrayList<>();
+        indexProps.add("REPLICATION_SCOPE=0");
         Object saltBucketNum = TableProperty.SALT_BUCKETS.getValue(tableProps);
         if (saltBucketNum != null) {
             indexProps.add("SALT_BUCKETS=" + saltBucketNum);
@@ -1982,9 +1993,10 @@ public class MetaDataClient {
             indexProps.add("COLUMN_ENCODED_BYTES=" + columnEncodedBytes);
         }
         createIndexSql = createIndexSql + " " + String.join(", ", indexProps);
-        try {
-            pstmt.execute(createIndexSql);
-        } catch (SQLException e) {
+        try (Connection internalConnection = QueryUtil.getConnection(props, 
connection.getQueryServices().getConfiguration())) {
+            PhoenixStatement pstmt = new PhoenixStatement((PhoenixConnection) 
internalConnection);
+                pstmt.execute(createIndexSql);
+            } catch (SQLException e) {
             if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) {
                 throw new 
SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(
                         statement.getCdcObjName().getName()).setRootCause(
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 96641c2dcf..6e87121ef9 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -38,7 +38,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.bson.RawBsonDocument;
 
 public class CDCUtil {
-    public static final String CDC_INDEX_PREFIX = "__CDC__";
+    public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX";
 
     /**
      * Make a set of CDC change scope enums from the given string containing 
comma separated scope
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/QueryUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 52e4f6aa0e..6edb67af11 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -424,7 +424,7 @@ public final class QueryUtil {
         return getConnection(new Properties(), conf);
     }
 
-    private static Connection getConnection(Properties props, Configuration 
conf)
+    public static Connection getConnection(Properties props, Configuration 
conf)
             throws SQLException {
         String url = getConnectionUrl(props, conf);
         LOGGER.info(String.format("Creating connection with the jdbc url: %s, 
isServerSide = %s",
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index e96406f142..126e212104 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -70,6 +70,7 @@ import org.apache.phoenix.schema.types.PSmallint;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.coprocessorclient.RowKeyMatcher;
 import org.apache.phoenix.coprocessorclient.TableTTLInfoCache;
@@ -155,6 +156,7 @@ public class CompactionScanner implements InternalScanner {
     private long inputCellCount = 0;
     private long outputCellCount = 0;
     private boolean phoenixLevelOnly = false;
+    private boolean isCDCIndex;
 
     // Only for forcing minor compaction while testing
     private static boolean forceMinorCompaction = false;
@@ -197,6 +199,7 @@ public class CompactionScanner implements InternalScanner {
         emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
                         || localIndex;
 
+        isCDCIndex = table != null ? CDCUtil.isCDCIndex(table) : false;
         // Initialize the tracker that computes the TTL for the compacting 
table.
         // The TTL tracker can be
         // simple (one single TTL for the table) when the compacting table is 
not Partitioned
@@ -2356,6 +2359,19 @@ public class CompactionScanner implements 
InternalScanner {
             }
         }
 
+        /**
+         * For a CDC index, we retain all cells within the max lookback window 
as opposed to
+         * retaining all row versions visible through max lookback window we 
do for other tables
+         */
+        private boolean retainCellsForCDCIndex(List<Cell> result, List<Cell> 
retainedCells) {
+            for (Cell cell : result) {
+                if (cell.getTimestamp() >= 
rowTracker.getRowContext().getMaxLookbackWindowStart()) {
+                    retainedCells.add(cell);
+                }
+            }
+            return true;
+        }
+
         /**
          * The retained cells includes the cells that are visible through the 
max lookback
          * window and the additional empty column cells that are needed to 
reduce large time
@@ -2366,6 +2382,9 @@ public class CompactionScanner implements InternalScanner 
{
 
             lastRowVersion.clear();
             emptyColumn.clear();
+            if (isCDCIndex) {
+                return retainCellsForCDCIndex(result, retainedCells);
+            }
             getLastRowVersionInMaxLookbackWindow(result, lastRowVersion, 
retainedCells,
                     emptyColumn);
             if (lastRowVersion.isEmpty()) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 1ee83005bb..ad853ef141 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -55,12 +55,15 @@ import 
org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
 import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.schema.types.PVarbinary;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -70,6 +73,7 @@ import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ScanUtil;
@@ -77,6 +81,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -273,11 +278,31 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
             verificationResultRepository =
                     new 
IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), 
hTableFactory);
             nextStartKey = null;
-            minTimestamp = scan.getTimeRange().getMin();
         }
+        computeMinTimestamp(config);
     }
 
-
+    /**
+     * For CDC indexes we do not need to consider rows outside max lookback 
window or before
+     * the index create time. minTimestamp needs to be computed and used for 
CDC indexes always
+     * even when it is not set on the scan
+     */
+    private void computeMinTimestamp(Configuration config) throws IOException {
+        minTimestamp = scan.getTimeRange().getMin();
+        if (indexMaintainer.isCDCIndex()) {
+            minTimestamp = EnvironmentEdgeManager.currentTimeMillis() - 
maxLookBackInMills;
+            try (PhoenixConnection conn =
+                    
QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) {
+                PTable indexTable = 
conn.getTableNoCache(indexMaintainer.getLogicalIndexName());
+                minTimestamp = Math.max(indexTable.getTimeStamp() + 1, 
minTimestamp);
+            } catch (SQLException e) {
+                LOGGER.error(
+                        "Unable to get the PTable for the index table "
+                                + indexMaintainer.getLogicalIndexName() + " " 
+ e);
+                throw new IOException(e);
+            }
+        }
+    }
     public static long getTimestamp(Mutation m) {
         for (List<Cell> cells : m.getFamilyCellMap().values()) {
             for (Cell cell : cells) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 3de43baee2..8054ca053d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -348,6 +348,10 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
                         Put put = null;
                         Delete del = null;
                         for (Cell cell : row) {
+                            if (cell.getTimestamp() < minTimestamp
+                                    && indexMaintainer.isCDCIndex()) {
+                                continue;
+                            }
                             if (cell.getType().equals(Cell.Type.Put)) {
                                 if (familyMap != null && 
!isColumnIncluded(cell)) {
                                     continue;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index c1ef1c399f..7e6e82ef62 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -167,20 +167,14 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
         conn.createStatement().execute(table_sql);
     }
 
-    protected void createCDCAndWait(Connection conn, String tableName, String 
cdcName,
-                                    String cdc_sql) throws Exception {
-        createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null);
+    protected void createCDC(Connection conn, String cdc_sql) throws Exception 
{
+        createCDC(conn, cdc_sql, null, null);
     }
 
-    protected void createCDCAndWait(Connection conn, String tableName, String 
cdcName,
-                                    String cdc_sql, 
PTable.QualifierEncodingScheme encodingScheme,
-                                    Integer nSaltBuckets) throws Exception {
+    protected void createCDC(Connection conn, String cdc_sql,
+            PTable.QualifierEncodingScheme encodingScheme, Integer 
nSaltBuckets) throws Exception {
         // For CDC, multitenancy gets derived automatically via the parent 
table.
         createTable(conn, cdc_sql, encodingScheme, false, nSaltBuckets, false, 
null);
-        String schemaName = SchemaUtil.getSchemaNameFromFullName(tableName);
-        tableName = SchemaUtil.getTableNameFromFullName(tableName);
-        IndexToolIT.runIndexTool(false, schemaName, tableName,
-                "\"" + CDCUtil.getCDCIndexName(cdcName) + "\"");
     }
 
     protected void assertCDCState(Connection conn, String cdcName, String 
expInclude,
@@ -419,16 +413,22 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
     //  - with a null
     //  - missing columns
     protected List<ChangeRow> generateChanges(long startTS, String[] 
tenantids, String tableName,
-                                              String datatableNameForDDL, 
CommitAdapter committer)
+            String datatableNameForDDL, CommitAdapter committer)
                             throws Exception {
+        return generateChanges(startTS, tenantids, tableName, 
datatableNameForDDL, committer,
+                "v3", 0);
+    }
+    protected List<ChangeRow> generateChanges(long startTS, String[] 
tenantids, String tableName,
+            String datatableNameForDDL, CommitAdapter committer, String 
columnToDrop, int startKey)
+            throws Exception {
         List<ChangeRow> changes = new ArrayList<>();
         EnvironmentEdgeManager.injectEdge(injectEdge);
         injectEdge.setValue(startTS);
-        boolean dropV3Done = false;
+        boolean dropColumnDone = false;
         committer.init();
-        Map<String, Object> rowid1 = new HashMap() {{ put("K", 1); }};
-        Map<String, Object> rowid2 = new HashMap() {{ put("K", 2); }};
-        Map<String, Object> rowid3 = new HashMap() {{ put("K", 3); }};
+        Map<String, Object> rowid1 = new HashMap() {{ put("K", startKey + 1); 
}};
+        Map<String, Object> rowid2 = new HashMap() {{ put("K", startKey + 2); 
}};
+        Map<String, Object> rowid3 = new HashMap() {{ put("K", startKey + 3); 
}};
         for (String tid: tenantids) {
             try (Connection conn = committer.getConnection(tid)) {
                 changes.add(addChange(conn, tableName,
@@ -445,7 +445,6 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
                         }})
                 ));
                 committer.commit(conn);
-
                 changes.add(addChange(conn, tableName, new ChangeRow(tid, 
startTS += 100,
                         rowid3, new TreeMap<String, Object>() {{
                             put("V1", 300L);
@@ -462,12 +461,12 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
                 ));
                 committer.commit(conn);
             }
-            if (datatableNameForDDL != null && !dropV3Done) {
+            if (datatableNameForDDL != null && !dropColumnDone && columnToDrop 
!= null) {
                 try (Connection conn = newConnection()) {
                     conn.createStatement().execute("ALTER TABLE " + 
datatableNameForDDL +
                             " DROP COLUMN v3");
                 }
-                dropV3Done = true;
+                dropColumnDone = true;
             }
             try (Connection conn = newConnection(tid)) {
                 changes.add(addChange(conn, tableName, new ChangeRow(tid, 
startTS += 100, rowid1,
@@ -518,7 +517,6 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
                         }})
                 ));
                 committer.commit(conn);
-
                 changes.add(addChange(conn, tableName, new ChangeRow(tid, 
startTS += 100, rowid1,
                         null)));
                 committer.commit(conn);
@@ -528,6 +526,7 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
         return changes;
     }
 
+
     protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String 
dataTableName,
                                        Map<String, String> dataColumns, 
List<ChangeRow> changes,
                                        Set<PTable.CDCChangeScope> 
changeScopes) throws Exception {
@@ -746,6 +745,10 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
             return change == null ? CDC_DELETE_EVENT_TYPE : 
CDC_UPSERT_EVENT_TYPE;
         }
 
+        public long getTimestamp() {
+            return changeTS;
+        }
+
         ChangeRow(String tenantid, long changeTS, Map<String, Object> pks, 
Map<String, Object> change) {
             this.tenantId = tenantid;
             this.changeTS = changeTS;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index 29f6e41d0a..21dae3bf70 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -87,7 +87,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
         }
 
         cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-        createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null);
+        createCDC(conn, cdc_sql, null, null);
         assertCDCState(conn, cdcName, null, 3);
         assertNoResults(conn, cdcName);
 
@@ -104,7 +104,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
 
         cdcName = generateUniqueName();
         cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE 
(pre, post)";
-        createCDCAndWait(conn, tableName, cdcName, cdc_sql);
+        createCDC(conn, cdc_sql);
         assertCDCState(conn, cdcName, PRE+","+POST, 3);
         assertPTable(cdcName, new HashSet<>(
                 Arrays.asList(PRE, POST)), tableName, datatableName);
@@ -140,7 +140,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
 
                 String cdcName = generateUniqueName();
                 String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, null,
+                createCDC(conn, cdc_sql, null,
                         saltingConfig[1]);
                 try {
                     assertCDCState(conn, cdcName, null, 3);
@@ -186,7 +186,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
         }
 
         cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-        createCDCAndWait(conn, tableName, cdcName, cdc_sql);
+        createCDC(conn, cdc_sql);
         assertCDCState(conn, cdcName, null, 3);
         assertPTable(cdcName, null, tableName, datatableName);
     }
@@ -306,7 +306,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
         }
         String cdcName = generateUniqueName();
         String cdc_sql = "CREATE CDC  " + cdcName + " ON " + tableName;
-        createCDCAndWait(conn, tableName, cdcName, cdc_sql);
+        createCDC(conn, cdc_sql);
         try {
             conn.createStatement().executeQuery("SELECT " +
                     "/*+ CDC_INCLUDE(DUMMY) */ * FROM " + cdcName);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index 8f675d9d90..a1b99e90b2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -18,13 +18,21 @@
 package org.apache.phoenix.end2end;
 
 import org.apache.hadoop.hbase.TableName;
-import org.apache.phoenix.end2end.index.SingleCellIndexIT;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -36,7 +44,6 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -50,8 +57,14 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_OLD_INDEX_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
 import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
-import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
 import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS;
 import static org.junit.Assert.assertEquals;
@@ -65,25 +78,24 @@ import static org.junit.Assert.assertTrue;
 //          "".isEmpty();
 //      }
 @RunWith(Parameterized.class)
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
 public class CDCQueryIT extends CDCBaseIT {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCQueryIT.class);
+    private static final int MAX_LOOKBACK_AGE = 10; // seconds
 
     // Offset of the first column, depending on whether 
PHOENIX_ROW_TIMESTAMP() is in the schema
     // or not.
     private final boolean forView;
-    private final boolean dataBeforeCDC;
     private final PTable.QualifierEncodingScheme encodingScheme;
     private final boolean multitenant;
     private final Integer indexSaltBuckets;
     private final Integer tableSaltBuckets;
     private final boolean withSchemaName;
 
-    public CDCQueryIT(Boolean forView, Boolean dataBeforeCDC,
+    public CDCQueryIT(Boolean forView,
                       PTable.QualifierEncodingScheme encodingScheme, boolean 
multitenant,
                       Integer indexSaltBuckets, Integer tableSaltBuckets, 
boolean withSchemaName) {
         this.forView = forView;
-        this.dataBeforeCDC = dataBeforeCDC;
         this.encodingScheme = encodingScheme;
         this.multitenant = multitenant;
         this.indexSaltBuckets = indexSaltBuckets;
@@ -91,26 +103,27 @@ public class CDCQueryIT extends CDCBaseIT {
         this.withSchemaName = withSchemaName;
     }
 
-    @Parameterized.Parameters(name = "forView={0} dataBeforeCDC={1}, 
encodingScheme={2}, " +
-            "multitenant={3}, indexSaltBuckets={4}, tableSaltBuckets={5} 
withSchemaName=${6}")
+    @Parameterized.Parameters(name = "forView={0}, encodingScheme={1}, " +
+            "multitenant={2}, indexSaltBuckets={3}, tableSaltBuckets={4} 
withSchemaName=${5}")
     public static synchronized Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { Boolean.FALSE, Boolean.FALSE, TWO_BYTE_QUALIFIERS, 
Boolean.FALSE, null, null,
-                        Boolean.FALSE },
-                { Boolean.FALSE, Boolean.TRUE, TWO_BYTE_QUALIFIERS, 
Boolean.FALSE, null, null,
-                        Boolean.TRUE },
-                { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, 
Boolean.FALSE, 1, 1,
-                        Boolean.FALSE },
+                { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
null, Boolean.FALSE },
+                { Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
null, Boolean.TRUE },
+                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 1, 1, 
Boolean.FALSE },
                 // Once PHOENIX-7239, change this to have different salt 
buckets for data and index.
-                { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, 
Boolean.TRUE, 1, 1,
-                        Boolean.TRUE },
-                { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, 
Boolean.FALSE, 4, null,
-                        Boolean.FALSE },
-                { Boolean.TRUE, Boolean.FALSE, TWO_BYTE_QUALIFIERS, 
Boolean.FALSE, null, null,
-                        Boolean.FALSE },
+                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 1, 
Boolean.TRUE },
+                { Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, 
null, Boolean.FALSE },
+                { Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, 
null, Boolean.FALSE },
         });
     }
 
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+                Integer.toString(MAX_LOOKBACK_AGE));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
     @Before
     public void beforeTest(){
         EnvironmentEdgeManager.reset();
@@ -118,6 +131,13 @@ public class CDCQueryIT extends CDCBaseIT {
         injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
     }
 
+    private void cdcIndexShouldNotBeUsedForDataTableQueries(Connection conn, 
String dataTableName,
+            String cdcName) throws Exception {
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * 
FROM " + dataTableName
+                + " WHERE PHOENIX_ROW_TIMESTAMP() < CURRENT_TIME()");
+        String explainPlan = QueryUtil.getExplainPlan(rs);
+        assertFalse(explainPlan.contains(cdcName));
+    }
     @Test
     public void testSelectCDC() throws Exception {
         String cdcName, cdc_sql;
@@ -138,10 +158,8 @@ public class CDCQueryIT extends CDCBaseIT {
             }
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-            if (!dataBeforeCDC) {
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme,
+            createCDC(conn, cdc_sql, encodingScheme,
                         indexSaltBuckets);
-            }
         }
 
         String tenantId = multitenant ? "1000" : null;
@@ -154,17 +172,6 @@ public class CDCQueryIT extends CDCBaseIT {
         List<ChangeRow> changes = generateChanges(startTS, tenantids, 
tableName, null,
                 COMMIT_SUCCESS);
 
-        if (dataBeforeCDC) {
-            try (Connection conn = newConnection()) {
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme,
-                        indexSaltBuckets);
-            }
-            // Testing with flushed data adds more coverage.
-            getUtility().getAdmin().flush(TableName.valueOf(datatableName));
-            
getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
-                    CDCUtil.getCDCIndexName(cdcName))));
-        }
-
         //SingleCellIndexIT.dumpTable(tableName);
         //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
 
@@ -234,6 +241,7 @@ public class CDCQueryIT extends CDCBaseIT {
                     assertEquals(false, rs.next());
                 }
             }
+            cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
         }
     }
 
@@ -271,10 +279,7 @@ public class CDCQueryIT extends CDCBaseIT {
             }
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE 
(change)";
-            if (!dataBeforeCDC) {
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme,
-                        indexSaltBuckets);
-            }
+            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
         }
 
         String tenantId = multitenant ? "1000" : null;
@@ -307,17 +312,6 @@ public class CDCQueryIT extends CDCBaseIT {
         //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
         //LOGGER.debug("----------");
 
-        if (dataBeforeCDC) {
-            try (Connection conn = newConnection()) {
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme,
-                        indexSaltBuckets);
-            }
-            // Testing with flushed data adds more coverage.
-            getUtility().getAdmin().flush(TableName.valueOf(datatableName));
-            
getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
-                    CDCUtil.getCDCIndexName(cdcName))));
-        }
-
         String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
         try (Connection conn = newConnection(tenantId)) {
             // For debug: uncomment to see the exact results logged to console.
@@ -351,6 +345,7 @@ public class CDCQueryIT extends CDCBaseIT {
             verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery(
                             "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * 
FROM " + cdcFullName),
                     datatableName, dataColumns, changes, ALL_IMG);
+            cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
         }
     }
 
@@ -386,10 +381,8 @@ public class CDCQueryIT extends CDCBaseIT {
             }
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-            if (!dataBeforeCDC) {
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme,
-                        indexSaltBuckets);
-            }
+
+            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
         }
 
         String tenantId = multitenant ? "1000" : null;
@@ -402,17 +395,6 @@ public class CDCQueryIT extends CDCBaseIT {
         List<ChangeRow> changes = generateChangesImmutableTable(startTS, 
tenantids, tableName,
                 COMMIT_SUCCESS);
 
-        if (dataBeforeCDC) {
-            try (Connection conn = newConnection()) {
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme,
-                        indexSaltBuckets);
-            }
-            // Testing with flushed data adds more coverage.
-            getUtility().getAdmin().flush(TableName.valueOf(datatableName));
-            
getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
-                    CDCUtil.getCDCIndexName(cdcName))));
-        }
-
         String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
         Map<String, String> dataColumns = new TreeMap<String, String>() {{
             put("V1", "INTEGER");
@@ -439,6 +421,7 @@ public class CDCQueryIT extends CDCBaseIT {
             verifyChangesViaSCN(tenantId, 
conn.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ " +
                             "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + 
cdcFullName),
                     datatableName, dataColumns, changes, CHANGE_IMG);
+            cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
         }
     }
 
@@ -471,10 +454,8 @@ public class CDCQueryIT extends CDCBaseIT {
             }
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-            if (!dataBeforeCDC) {
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme,
-                        indexSaltBuckets);
-            }
+            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            cdcIndexShouldNotBeUsedForDataTableQueries(conn, 
tableName,cdcName);
         }
 
         EnvironmentEdgeManager.injectEdge(injectEdge);
@@ -540,13 +521,6 @@ public class CDCQueryIT extends CDCBaseIT {
         Timestamp ts4 = new Timestamp(cal.getTime().getTime());
         EnvironmentEdgeManager.reset();
 
-        if (dataBeforeCDC) {
-            try (Connection conn = newConnection()) {
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme,
-                        indexSaltBuckets);
-            }
-        }
-
         //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
 
         String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
@@ -623,10 +597,7 @@ public class CDCQueryIT extends CDCBaseIT {
 
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-            if (!dataBeforeCDC) {
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme,
-                        indexSaltBuckets);
-            }
+            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
             conn.createStatement().execute("ALTER TABLE " + datatableName + " 
DROP COLUMN v0");
         }
 
@@ -640,17 +611,6 @@ public class CDCQueryIT extends CDCBaseIT {
         List<ChangeRow> changes = generateChanges(startTS, tenantids, 
tableName, datatableName,
                 COMMIT_SUCCESS);
 
-        if (dataBeforeCDC) {
-            try (Connection conn = newConnection()) {
-                createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme,
-                        indexSaltBuckets);
-            }
-            // Testing with flushed data adds more coverage.
-            getUtility().getAdmin().flush(TableName.valueOf(datatableName));
-            
getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
-                    CDCUtil.getCDCIndexName(cdcName))));
-        }
-
         Map<String, String> dataColumns = new TreeMap<String, String>() {{
             put("V0", "INTEGER");
             put("V1", "INTEGER");
@@ -664,16 +624,12 @@ public class CDCQueryIT extends CDCBaseIT {
                             "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + 
SchemaUtil.getTableName(
                                     schemaName, cdcName)),
                     datatableName, dataColumns, changes, CHANGE_IMG);
+            cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
         }
     }
 
     @Test
     public void testSelectCDCFailDataTableUpdate() throws Exception {
-        if (dataBeforeCDC == true) {
-            // In this case, index will not exist at the time of upsert, so we 
can't simulate the
-            // index failure.
-            return;
-        }
         String schemaName = withSchemaName ? generateUniqueName() : null;
         String tableName = SchemaUtil.getTableName(schemaName, 
generateUniqueName());
         String cdcName, cdc_sql;
@@ -692,7 +648,8 @@ public class CDCQueryIT extends CDCBaseIT {
             }
             cdcName = generateUniqueName();
             cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-            createCDCAndWait(conn, tableName, cdcName, cdc_sql, 
encodingScheme, indexSaltBuckets);
+            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, 
cdcName);
         }
 
         String tenantId = multitenant ? "1000" : null;
@@ -702,13 +659,186 @@ public class CDCQueryIT extends CDCBaseIT {
         }
 
         long startTS = System.currentTimeMillis();
-        generateChanges(startTS, tenantids, tableName, null,
-                COMMIT_FAILURE_EXPECTED);
+        generateChanges(startTS, tenantids, tableName, null, 
COMMIT_FAILURE_EXPECTED);
 
         try (Connection conn = newConnection(tenantId)) {
             ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" +
                     SchemaUtil.getTableName(schemaName, cdcName));
             assertEquals(false, rs.next());
+
+        }
+    }
+
+    @Test
+    public void testCDCIndexBuildAndVerification() throws Exception {
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = generateUniqueName();
+        String tableFullName = SchemaUtil.getTableName(schemaName, tableName);
+        String cdcName, cdc_sql;
+        try (Connection conn = newConnection()) {
+            // Create a table and add some rows
+            createTable(conn, "CREATE TABLE  " + tableFullName + " (" + 
(multitenant ?
+                    "TENANT_ID CHAR(5) NOT NULL, " :
+                    "")
+                    + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2 
INTEGER, B.vb INTEGER, "
+                    + "v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (multitenant ?
+                    "(TENANT_ID, k) " :
+                    "(k)") + ")", encodingScheme, multitenant, 
tableSaltBuckets, false, null);
+            if (forView) {
+                String viewName = generateUniqueName();
+                String viewFullName = SchemaUtil.getTableName(schemaName, 
viewName);
+                createTable(conn, "CREATE VIEW " + viewFullName + " AS SELECT 
* FROM " + tableFullName,
+                        encodingScheme);
+                tableName = viewName;
+                tableFullName = viewFullName;
+            }
+
+            String tenantId = multitenant ? "1000" : null;
+            String[] tenantids = { tenantId };
+            if (multitenant) {
+                tenantids = new String[] { tenantId, "2000" };
+            }
+
+            long startTS = System.currentTimeMillis();
+            List<ChangeRow> changes = generateChanges(startTS, tenantids, 
tableFullName,
+                    tableFullName, COMMIT_SUCCESS, null, 0);
+            // Make sure the timestamp of the mutations are not in the future
+            long currentTime = System.currentTimeMillis();
+            long nextTime = changes.get(changes.size() - 1).getTimestamp() + 1;
+            if (nextTime > currentTime) {
+                Thread.sleep(nextTime - currentTime);
+            }
+            // Create a CDC table
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
+            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            // Check CDC index is active but empty
+            String indexTableFullName = SchemaUtil.getTableName(schemaName,
+                    CDCUtil.getCDCIndexName(cdcName));
+            PTable indexTable = ((PhoenixConnection) 
conn).getTableNoCache(indexTableFullName);
+            assertEquals(indexTable.getIndexState(), PIndexState.ACTIVE);
+            TestUtil.assertRawRowCount(conn,
+                    
TableName.valueOf(indexTable.getPhysicalName().getString()),0);
+            // Rebuild the index and verify that it is still empty
+            IndexToolIT.runIndexTool(false, schemaName, tableName,
+                    CDCUtil.getCDCIndexName(cdcName));
+            TestUtil.assertRawRowCount(conn,
+                    
TableName.valueOf(indexTable.getPhysicalName().getString()),0);
+            // Add more rows
+            startTS = System.currentTimeMillis();
+            changes = generateChanges(startTS, tenantids, tableFullName,
+                    tableFullName, COMMIT_SUCCESS, null, 1);
+            currentTime = System.currentTimeMillis();
+            // Advance time by the max lookback age. This will cause all rows 
to expire
+            nextTime = changes.get(changes.size() - 1).getTimestamp() + 1;
+            if (nextTime > currentTime) {
+                Thread.sleep(nextTime - currentTime);
+            }
+            // Verify CDC index verification pass
+            IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName, 
tableName,
+                    CDCUtil.getCDCIndexName(cdcName), null, 0, 
IndexTool.IndexVerifyType.ONLY);
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+
+        }
+    }
+
+    @Test
+    public void testCDCIndexTTLEqualsToMaxLookbackAge() throws Exception {
+        if (forView) {
+            // Except for views
+            return;
+        }
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = generateUniqueName();
+        String tableFullName = SchemaUtil.getTableName(schemaName, tableName);
+        String cdcName, cdc_sql;
+        try (Connection conn = newConnection()) {
+            // Create a table
+            createTable(conn, "CREATE TABLE  " + tableFullName + " (" + 
(multitenant ?
+                    "TENANT_ID CHAR(5) NOT NULL, " :
+                    "")
+                    + "k INTEGER NOT NULL, v1 INTEGER, v1v2 INTEGER, v2 
INTEGER, B.vb INTEGER, "
+                    + "v3 INTEGER, CONSTRAINT PK PRIMARY KEY " + (multitenant ?
+                    "(TENANT_ID, k) " :
+                    "(k)") + ")", encodingScheme, multitenant, 
tableSaltBuckets, false, null);
+            if (forView) {
+                String viewName = generateUniqueName();
+                String viewFullName = SchemaUtil.getTableName(schemaName, 
viewName);
+                createTable(conn, "CREATE VIEW " + viewFullName + " AS SELECT 
* FROM " + tableFullName,
+                        encodingScheme);
+                tableName = viewName;
+                tableFullName = viewFullName;
+            }
+
+            String tenantId = multitenant ? "1000" : null;
+            String[] tenantids = { tenantId };
+            if (multitenant) {
+                tenantids = new String[] { tenantId, "2000" };
+            }
+
+            // Create a CDC table
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableFullName;
+            createCDC(conn, cdc_sql, encodingScheme, indexSaltBuckets);
+            // Add rows
+            long startTS = System.currentTimeMillis();
+            List<ChangeRow> changes = generateChanges(startTS, tenantids, 
tableFullName,
+                    tableFullName, COMMIT_SUCCESS, null, 0);
+            String indexTableFullName = SchemaUtil.getTableName(schemaName,
+                    CDCUtil.getCDCIndexName(cdcName));
+            PTable indexTable = ((PhoenixConnection) 
conn).getTableNoCache(indexTableFullName);
+            String indexTablePhysicalName = 
indexTable.getPhysicalName().toString();
+            int expectedRawRowCount = TestUtil.getRawRowCount(conn,
+                    TableName.valueOf(indexTablePhysicalName));
+            long currentTime = System.currentTimeMillis();
+            // Advance time by the max lookback age. This will cause all rows 
to expire
+            long nextTime = changes.get(changes.size() - 1).getTimestamp()
+                    + MAX_LOOKBACK_AGE * 1000 + 1;
+            if (nextTime > currentTime) {
+                Thread.sleep(nextTime - currentTime);
+            }
+            // Major compact the CDC index. This will remove all expired rows
+            TestUtil.doMajorCompaction(conn, indexTablePhysicalName);
+            // Check CDC index is empty
+            TestUtil.assertRawRowCount(conn, 
TableName.valueOf(indexTablePhysicalName),0);
+            // Rebuild the index and verify that it is still empty
+            IndexToolIT.runIndexTool(false, schemaName, tableName,
+                    CDCUtil.getCDCIndexName(cdcName));
+            TestUtil.assertRawRowCount(conn, 
TableName.valueOf(indexTablePhysicalName),0);
+            // This time we test we only keep the row versions within the max 
lookback window
+            startTS = System.currentTimeMillis();
+            // Add the first set of rows
+            changes = generateChanges(startTS, tenantids, tableFullName,
+                    tableFullName, COMMIT_SUCCESS, null, 0);
+            // Advance time by the max lookback age. This will cause the first 
set of rows to expire
+            startTS = changes.get(changes.size() - 1).getTimestamp()
+                    + MAX_LOOKBACK_AGE * 1000 + 1;
+            // Add another set of changes
+            changes = generateChanges(startTS, tenantids, tableFullName,
+                    tableFullName, COMMIT_SUCCESS, null, 10);
+            nextTime = changes.get(changes.size() - 1).getTimestamp() + 1;
+            // Major compact the CDC index which remove all expired rows which 
is
+            // the first set of rows
+            currentTime = System.currentTimeMillis();
+            if (nextTime > currentTime) {
+                Thread.sleep(nextTime - currentTime);
+            }
+            TestUtil.doMajorCompaction(conn, indexTablePhysicalName);
+            // Check the CDC index has the first set of rows
+            TestUtil.assertRawRowCount(conn, 
TableName.valueOf(indexTablePhysicalName),
+                    expectedRawRowCount);
+            // Rebuild the index and verify that it still have the same number 
of rows
+            IndexToolIT.runIndexTool(false, schemaName, tableName,
+                    CDCUtil.getCDCIndexName(cdcName));
+            TestUtil.assertRawRowCount(conn, 
TableName.valueOf(indexTablePhysicalName),
+                    expectedRawRowCount);
+
         }
     }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 1845f01a62..6ea2a2eb65 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -954,11 +954,12 @@ public class TestUtil {
         int cellCount = 0;
         int rowCount = 0;
         try (ResultScanner scanner = table.getScanner(s)) {
-            Result result = null;
+            Result result;
             while ((result = scanner.next()) != null) {
                 rowCount++;
+                System.out.println("Row count: " + rowCount);
                 CellScanner cellScanner = result.cellScanner();
-                Cell current = null;
+                Cell current;
                 while (cellScanner.advance()) {
                     current = cellScanner.current();
                     System.out.println(current + " column= " +
@@ -972,24 +973,17 @@ public class TestUtil {
     }
 
     public static int getRawRowCount(Table table) throws IOException {
+        dumpTable(table);
         return getRowCount(table, true);
     }
 
     public static int getRowCount(Table table, boolean isRaw) throws 
IOException {
         Scan s = new Scan();
         s.setRaw(isRaw);
-        ;
-        s.readAllVersions();
         int rows = 0;
         try (ResultScanner scanner = table.getScanner(s)) {
-            Result result = null;
-            while ((result = scanner.next()) != null) {
+            while (scanner.next() != null) {
                 rows++;
-                CellScanner cellScanner = result.cellScanner();
-                Cell current = null;
-                while (cellScanner.advance()) {
-                    current = cellScanner.current();
-                }
             }
         }
         return rows;
@@ -1390,6 +1384,12 @@ public class TestUtil {
         assertEquals(expectedRowCount, count);
     }
 
+    public static int getRawRowCount(Connection conn, TableName table)
+            throws SQLException, IOException {
+        ConnectionQueryServices cqs = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+        return TestUtil.getRawRowCount(cqs.getTable(table.getName()));
+    }
+
     public static int getRawCellCount(Connection conn, TableName tableName, 
byte[] row)
             throws SQLException, IOException {
         ConnectionQueryServices cqs = 
conn.unwrap(PhoenixConnection.class).getQueryServices();

Reply via email to