This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch tmp-ec
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit a6f8d158b70e6249157f5669ff57ceac3e3de720
Author: Viraj Jasani <[email protected]>
AuthorDate: Fri Mar 13 16:30:43 2026 -0700

    addendum
---
 Jenkinsfile                                        |   2 +-
 Jenkinsfile.yetus                                  |   2 +-
 phoenix-core-client/src/main/antlr3/PhoenixSQL.g   |   3 +-
 .../java/org/apache/phoenix/schema/PTable.java     |   5 +
 .../org/apache/phoenix/util/CDCChangeBuilder.java  |  71 ++++-
 .../src/main/protobuf/IndexMutations.proto         |   8 +
 .../coprocessor/CDCGlobalIndexRegionScanner.java   | 193 +++++++++----
 .../phoenix/hbase/index/IndexCDCConsumer.java      | 316 ++++++++++++++++++---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 211 +++++++++-----
 .../java/org/apache/phoenix/end2end/Bson5IT.java   |  20 +-
 ...currentMutationsCoveredEventualGenerateIT.java} |  38 ++-
 ... => ConcurrentMutationsExtendedGenerateIT.java} |  34 ++-
 .../end2end/ConcurrentMutationsExtendedIT.java     |  10 +-
 .../ConcurrentMutationsExtendedIndexIT.java        |   6 +-
 .../ConcurrentMutationsLazyPostBatchWriteIT.java   |   6 +-
 ...rrentMutationsUncoveredEventualGenerateIT.java} |  38 ++-
 ...xToolForNonTxGlobalIndexEventualGenerateIT.java |  93 ++++++
 .../IndexToolForNonTxGlobalIndexEventualIT.java    |  91 ++++++
 .../end2end/IndexToolForNonTxGlobalIndexIT.java    |  29 +-
 .../phoenix/end2end/VarBinaryEncoded2IT.java       |   8 +-
 .../GlobalIndexCheckerEventualGenerateIT.java      |  89 ++++++
 .../index/GlobalIndexCheckerEventualIT.java        |  87 ++++++
 .../end2end/index/GlobalIndexCheckerIT.java        |  49 +++-
 .../java/org/apache/phoenix/query/BaseTest.java    |   2 +-
 24 files changed, 1200 insertions(+), 211 deletions(-)

diff --git a/Jenkinsfile b/Jenkinsfile
index 0308d61c2f..49a44d3766 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -66,7 +66,7 @@ pipeline {
 
                     stage('BuildAndTest') {
                         options {
-                            timeout(time: 7, unit: 'HOURS')
+                            timeout(time: 9, unit: 'HOURS')
                         }
                         steps {
                             dir("HBASE_${HBASE_PROFILE}") {
diff --git a/Jenkinsfile.yetus b/Jenkinsfile.yetus
index cdd97f283e..7e99f45b05 100644
--- a/Jenkinsfile.yetus
+++ b/Jenkinsfile.yetus
@@ -37,7 +37,7 @@ pipeline {
             }
 
             options {
-                timeout(time: 7, unit: 'HOURS')
+                timeout(time: 9, unit: 'HOURS')
             }
 
             steps {
diff --git a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g 
b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
index 6eaefd5813..bb7d3d8a89 100644
--- a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
@@ -86,6 +86,7 @@ tokens
     POST='post';
     CHANGE='change';
     IDX_MUTATIONS='idx_mutations';
+    DATA_ROW_STATE='data_row_state';
     LATEST='latest';
     ALL='all';
     INDEX='index';
@@ -608,7 +609,7 @@ cdc_change_scopes returns [Set<CDCChangeScope> ret]
     ;
 
 cdc_change_scope returns [CDCChangeScope ret]
-    :   v=(PRE | POST | CHANGE | IDX_MUTATIONS)
+    :   v=(PRE | POST | CHANGE | IDX_MUTATIONS | DATA_ROW_STATE)
         {
             ret = CDCChangeScope.valueOf(v.getText().toUpperCase());
         }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
index bd314e00c9..effd7773ec 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -1162,5 +1162,10 @@ public interface PTable extends PMetaDataEntity {
      * Include index mutations for eventually consistent indexes.
      */
     IDX_MUTATIONS,
+
+    /**
+     * Include raw before/after data row states as serialized Puts for index 
mutation generation.
+     */
+    DATA_ROW_STATE,
   }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
index 34408cfbf1..880a0a327b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
@@ -26,9 +26,12 @@ import static 
org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE;
 import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.CDCTableInfo;
 import org.apache.phoenix.schema.PTable;
 
@@ -37,6 +40,7 @@ public class CDCChangeBuilder {
   private final boolean isPreImageInScope;
   private final boolean isPostImageInScope;
   private final boolean isIdxMutationsInScope;
+  private final boolean isDataRowStateInScope;
   private final CDCTableInfo cdcDataTableInfo;
   private String changeType;
   private long lastDeletedTimestamp;
@@ -44,6 +48,12 @@ public class CDCChangeBuilder {
   private Map<String, Object> preImage = null;
   private Map<String, Object> changeImage = null;
 
+  private boolean isFullRowDelete;
+  private Map<ImmutableBytesPtr, Cell> rawLatestBeforeChange;
+  private Map<ImmutableBytesPtr, Cell> rawAtChange;
+  private Set<ImmutableBytesPtr> rawDeletedColumnsAtChange;
+  private Map<ImmutableBytesPtr, Long> rawDeletedColumnsBeforeChange;
+
   public CDCChangeBuilder(CDCTableInfo cdcDataTableInfo) {
     this.cdcDataTableInfo = cdcDataTableInfo;
     Set<PTable.CDCChangeScope> changeScopes = 
cdcDataTableInfo.getIncludeScopes();
@@ -51,6 +61,7 @@ public class CDCChangeBuilder {
     isPreImageInScope = changeScopes.contains(PTable.CDCChangeScope.PRE);
     isPostImageInScope = changeScopes.contains(PTable.CDCChangeScope.POST);
     isIdxMutationsInScope = 
changeScopes.contains(PTable.CDCChangeScope.IDX_MUTATIONS);
+    isDataRowStateInScope = 
changeScopes.contains(PTable.CDCChangeScope.DATA_ROW_STATE);
   }
 
   public void initChange(long ts) {
@@ -63,6 +74,13 @@ public class CDCChangeBuilder {
     if (isChangeImageInScope || isPostImageInScope) {
       changeImage = new HashMap<>();
     }
+    if (isDataRowStateInScope) {
+      isFullRowDelete = false;
+      rawLatestBeforeChange = new LinkedHashMap<>();
+      rawAtChange = new LinkedHashMap<>();
+      rawDeletedColumnsAtChange = new HashSet<>();
+      rawDeletedColumnsBeforeChange = new HashMap<>();
+    }
   }
 
   public long getChangeTimestamp() {
@@ -79,6 +97,9 @@ public class CDCChangeBuilder {
 
   public void markAsDeletionEvent() {
     changeType = CDC_DELETE_EVENT_TYPE;
+    if (isDataRowStateInScope) {
+      isFullRowDelete = true;
+    }
   }
 
   public long getLastDeletedTimestamp() {
@@ -143,9 +164,49 @@ public class CDCChangeBuilder {
   }
 
   public boolean isOlderThanChange(Cell cell) {
-    return (cell.getTimestamp() < changeTimestamp && cell.getTimestamp() > 
lastDeletedTimestamp)
-      ? true
-      : false;
+    return cell.getTimestamp() < changeTimestamp && cell.getTimestamp() > 
lastDeletedTimestamp;
+  }
+
+  public void registerRawPut(Cell cell, ImmutableBytesPtr colKey) {
+    if (cell.getTimestamp() == changeTimestamp) {
+      rawAtChange.putIfAbsent(colKey, cell);
+    } else if (isOlderThanChange(cell)) {
+      Long colDeleteTs = rawDeletedColumnsBeforeChange.get(colKey);
+      if (
+        (colDeleteTs == null || cell.getTimestamp() > colDeleteTs)
+          && !rawLatestBeforeChange.containsKey(colKey)
+      ) {
+        rawLatestBeforeChange.put(colKey, cell);
+      }
+    }
+  }
+
+  public void registerRawDeleteColumn(Cell cell, ImmutableBytesPtr colKey) {
+    if (cell.getTimestamp() == changeTimestamp) {
+      rawDeletedColumnsAtChange.add(colKey);
+    } else if (isOlderThanChange(cell)) {
+      rawDeletedColumnsBeforeChange.putIfAbsent(colKey, cell.getTimestamp());
+    }
+  }
+
+  public boolean hasValidDataRowStateChanges() {
+    return isFullRowDelete || !rawAtChange.isEmpty() || 
!rawDeletedColumnsAtChange.isEmpty();
+  }
+
+  public boolean isFullRowDelete() {
+    return isFullRowDelete;
+  }
+
+  public Map<ImmutableBytesPtr, Cell> getRawLatestBeforeChange() {
+    return rawLatestBeforeChange;
+  }
+
+  public Map<ImmutableBytesPtr, Cell> getRawAtChange() {
+    return rawAtChange;
+  }
+
+  public Set<ImmutableBytesPtr> getRawDeletedColumnsAtChange() {
+    return rawDeletedColumnsAtChange;
   }
 
   public boolean isPreImageInScope() {
@@ -164,4 +225,8 @@ public class CDCChangeBuilder {
     return isIdxMutationsInScope;
   }
 
+  public boolean isDataRowStateInScope() {
+    return isDataRowStateInScope;
+  }
+
 }
diff --git a/phoenix-core-client/src/main/protobuf/IndexMutations.proto 
b/phoenix-core-client/src/main/protobuf/IndexMutations.proto
index eb5dc2bbce..cba4360dd7 100644
--- a/phoenix-core-client/src/main/protobuf/IndexMutations.proto
+++ b/phoenix-core-client/src/main/protobuf/IndexMutations.proto
@@ -29,3 +29,11 @@ message IndexMutations {
   repeated bytes tables = 1;
   repeated bytes mutations = 2;
 }
+
+// Raw data row states for generating index mutations.
+// Contains the data row key and serialized MutationProto for the before and 
after states.
+message DataRowStates {
+  optional bytes dataRowKey = 1;
+  optional bytes currentDataRowState = 2;
+  optional bytes nextDataRowState = 3;
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index c096c443ac..da196c83c8 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -21,6 +21,7 @@ import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverCons
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Base64;
@@ -29,11 +30,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilder;
 import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -62,6 +65,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
 /**
  * CDC (Change Data Capture) enabled region scanner for global indexes that 
processes uncovered CDC
  * index queries by reconstructing CDC events from index and data table rows.
@@ -90,6 +96,7 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
   private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class);
   private CDCTableInfo cdcDataTableInfo;
   private CDCChangeBuilder changeBuilder;
+  private static final byte[] SEPARATOR = { 0 };
 
   private static final byte[] EMPTY_IDX_MUTATIONS = 
PVarchar.INSTANCE.toBytes(Base64.getEncoder()
     
.encodeToString(IndexMutationsProtos.IndexMutations.getDefaultInstance().toByteArray()));
@@ -110,8 +117,9 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
   @Override
   protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
     if (
-      changeBuilder.isIdxMutationsInScope() && 
!changeBuilder.isChangeImageInScope()
-        && !changeBuilder.isPreImageInScope() && 
!changeBuilder.isPostImageInScope()
+      changeBuilder.isIdxMutationsInScope() && 
!changeBuilder.isDataRowStateInScope()
+        && !changeBuilder.isChangeImageInScope() && 
!changeBuilder.isPreImageInScope()
+        && !changeBuilder.isPostImageInScope()
     ) {
       return null;
     }
@@ -125,7 +133,11 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
     // stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
     // scan.getStopRow(), 0, SortOrder.getDefault());
     // }
-    return CDCUtil.setupScanForCDC(prepareDataTableScan(dataRowKeys, true));
+    Scan dataScan = prepareDataTableScan(dataRowKeys, true);
+    if (dataScan == null) {
+      return null;
+    }
+    return CDCUtil.setupScanForCDC(dataScan);
   }
 
   protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
@@ -173,59 +185,74 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
                 // marker after that.
                 changeBuilder.setLastDeletedTimestamp(cell.getTimestamp());
               }
-            } else if (
-              (cell.getType() == Cell.Type.DeleteColumn || cell.getType() == 
Cell.Type.Put)
-                && !Arrays.equals(cellQual, emptyCQ)
-            ) {
-              if (!changeBuilder.isChangeRelevant(cell)) {
-                // We don't need to build the change image, just skip it.
-                continue;
-              }
-              // In this case, cell is the row, meaning we loop over rows..
-              if (isSingleCell) {
-                while (curColumnNum < cdcColumnInfoList.size()) {
-                  boolean hasValue = 
dataTableProjector.getSchema().extractValue(cell,
-                    (SingleCellColumnExpression) expressions[curColumnNum], 
ptr);
-                  if (hasValue) {
-                    Object cellValue = getColumnValue(ptr.get(), 
ptr.getOffset(), ptr.getLength(),
-                      cdcColumnInfoList.get(curColumnNum).getColumnType());
-                    changeBuilder.registerChange(cell, curColumnNum, 
cellValue);
+            } else
+              if (cell.getType() == Cell.Type.DeleteColumn || cell.getType() 
== Cell.Type.Put) {
+                boolean isEmptyCQ = Arrays.equals(cellQual, emptyCQ);
+                if (changeBuilder.isDataRowStateInScope()) {
+                  ImmutableBytesPtr colKey =
+                    new ImmutableBytesPtr(Bytes.add(cellFam, SEPARATOR, 
cellQual));
+                  if (cell.getType() == Cell.Type.DeleteColumn) {
+                    changeBuilder.registerRawDeleteColumn(cell, colKey);
+                  } else {
+                    changeBuilder.registerRawPut(cell, colKey);
                   }
-                  ++curColumnNum;
                 }
-                break cellLoop;
-              }
-              while (true) {
-                CDCTableInfo.CDCColumnInfo currentColumnInfo = 
cdcColumnInfoList.get(curColumnNum);
-                int columnComparisonResult =
-                  CDCUtil.compareCellFamilyAndQualifier(cellFam, cellQual,
-                    currentColumnInfo.getColumnFamily(), 
currentColumnInfo.getColumnQualifier());
-                if (columnComparisonResult > 0) {
-                  if (++curColumnNum >= cdcColumnInfoList.size()) {
-                    // Have no more column definitions, so the rest of the 
cells
-                    // must be for dropped columns and so can be ignored.
-                    break cellLoop;
-                  }
-                  // Continue looking for the right column definition
-                  // for this cell.
+                if (
+                  isEmptyCQ || (changeBuilder.isDataRowStateInScope()
+                    && !changeBuilder.isChangeImageInScope() && 
!changeBuilder.isPreImageInScope()
+                    && !changeBuilder.isPostImageInScope())
+                ) {
                   continue;
-                } else if (columnComparisonResult < 0) {
-                  // We didn't find a column definition for this cell, ignore 
the
-                  // current cell but continue working on the rest of the 
cells.
-                  continue cellLoop;
                 }
+                // In this case, cell is the row, meaning we loop over rows..
+                if (isSingleCell) {
+                  while (curColumnNum < cdcColumnInfoList.size()) {
+                    boolean hasValue = 
dataTableProjector.getSchema().extractValue(cell,
+                      (SingleCellColumnExpression) expressions[curColumnNum], 
ptr);
+                    if (hasValue) {
+                      Object cellValue = getColumnValue(ptr.get(), 
ptr.getOffset(), ptr.getLength(),
+                        cdcColumnInfoList.get(curColumnNum).getColumnType());
+                      changeBuilder.registerChange(cell, curColumnNum, 
cellValue);
+                    }
+                    ++curColumnNum;
+                  }
+                  break cellLoop;
+                }
+                while (true) {
+                  CDCTableInfo.CDCColumnInfo currentColumnInfo =
+                    cdcColumnInfoList.get(curColumnNum);
+                  int columnComparisonResult =
+                    CDCUtil.compareCellFamilyAndQualifier(cellFam, cellQual,
+                      currentColumnInfo.getColumnFamily(), 
currentColumnInfo.getColumnQualifier());
+                  if (columnComparisonResult > 0) {
+                    if (++curColumnNum >= cdcColumnInfoList.size()) {
+                      // Have no more column definitions, so the rest of the 
cells
+                      // must be for dropped columns and so can be ignored.
+                      break cellLoop;
+                    }
+                    // Continue looking for the right column definition
+                    // for this cell.
+                    continue;
+                  } else if (columnComparisonResult < 0) {
+                    // We didn't find a column definition for this cell, 
ignore the
+                    // current cell but continue working on the rest of the 
cells.
+                    continue cellLoop;
+                  }
 
-                // else, found the column definition.
-                Object cellValue = cell.getType() == Cell.Type.DeleteColumn
-                  ? null
-                  : getColumnValue(cell, 
cdcColumnInfoList.get(curColumnNum).getColumnType());
-                changeBuilder.registerChange(cell, curColumnNum, cellValue);
-                // Done processing the current cell, check the next cell.
-                break;
+                  // else, found the column definition.
+                  Object cellValue = cell.getType() == Cell.Type.DeleteColumn
+                    ? null
+                    : getColumnValue(cell, 
cdcColumnInfoList.get(curColumnNum).getColumnType());
+                  changeBuilder.registerChange(cell, curColumnNum, cellValue);
+                  // Done processing the current cell, check the next cell.
+                  break;
+                }
               }
-            }
           }
-          if (changeBuilder.isNonEmptyEvent()) {
+          if (changeBuilder.isDataRowStateInScope()) {
+            buildDataRowStateResult(dataRowKey.copyBytesIfNecessary(), 
indexRowKey, indexCell,
+              result);
+          } else if (changeBuilder.isNonEmptyEvent()) {
             Result cdcRow = getCDCImage(indexRowKey, indexCell);
             if (cdcRow != null && tupleProjector != null) {
               if (indexCell.getType() == Cell.Type.DeleteFamily) {
@@ -250,7 +277,12 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
             result.clear();
           }
         } else {
-          result.clear();
+          if (changeBuilder.isDataRowStateInScope()) {
+            buildDataRowStateResult(dataRowKey.copyBytesIfNecessary(), 
indexRowKey, indexCell,
+              result);
+          } else {
+            result.clear();
+          }
         }
 
         return true;
@@ -336,6 +368,67 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
     return true;
   }
 
+  /**
+   * Builds a DataRowStates protobuf result from the raw cell maps collected 
by CDCChangeBuilder
+   * during the raw cell iteration. Constructs before/after HBase Put objects 
representing the row
+   * state before and after the change, serializes them, and populates the 
result. If no valid
+   * changes were found at the change timestamp (data not yet visible), only 
the dataRowKey is
+   * included, the consumer needs to retry.
+   * @param dataRowKey  The data table row key bytes.
+   * @param indexRowKey The CDC index row key.
+   * @param indexCell   The index cell.
+   * @param result      The result list to populate.
+   * @throws IOException if serialization fails.
+   */
+  private void buildDataRowStateResult(byte[] dataRowKey, byte[] indexRowKey, 
Cell indexCell,
+    List<Cell> result) throws IOException {
+    Put currentDataRowState = null;
+    Put nextDataRowState = null;
+    Map<ImmutableBytesPtr, Cell> latestBeforeChange = 
changeBuilder.getRawLatestBeforeChange();
+    Map<ImmutableBytesPtr, Cell> atChange = changeBuilder.getRawAtChange();
+    Set<ImmutableBytesPtr> deletedColumnsAtChange = 
changeBuilder.getRawDeletedColumnsAtChange();
+    if (changeBuilder.hasValidDataRowStateChanges()) {
+      if (!latestBeforeChange.isEmpty()) {
+        currentDataRowState = new Put(dataRowKey);
+        for (Cell cell : latestBeforeChange.values()) {
+          currentDataRowState.add(cell);
+        }
+      }
+      if (!changeBuilder.isFullRowDelete()) {
+        Put nextState = new Put(dataRowKey);
+        for (Map.Entry<ImmutableBytesPtr, Cell> entry : 
latestBeforeChange.entrySet()) {
+          if (
+            !atChange.containsKey(entry.getKey())
+              && !deletedColumnsAtChange.contains(entry.getKey())
+          ) {
+            nextState.add(entry.getValue());
+          }
+        }
+        for (Cell cell : atChange.values()) {
+          nextState.add(cell);
+        }
+        if (!nextState.isEmpty()) {
+          nextDataRowState = nextState;
+        }
+      }
+    }
+    IndexMutationsProtos.DataRowStates.Builder builder =
+      IndexMutationsProtos.DataRowStates.newBuilder();
+    builder.setDataRowKey(ByteString.copyFrom(dataRowKey));
+    if (currentDataRowState != null) {
+      builder.setCurrentDataRowState(ByteString.copyFrom(
+        ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, 
currentDataRowState)
+          .toByteArray()));
+    }
+    if (nextDataRowState != null) {
+      builder.setNextDataRowState(ByteString.copyFrom(ProtobufUtil
+        .toMutation(ClientProtos.MutationProto.MutationType.PUT, 
nextDataRowState).toByteArray()));
+    }
+    String base64String = 
Base64.getEncoder().encodeToString(builder.build().toByteArray());
+    byte[] cdcEventBytes = PVarchar.INSTANCE.toBytes(base64String);
+    addResult(indexRowKey, indexCell, result, indexCell, cdcEventBytes);
+  }
+
   /**
    * Handles CDC event when IDX_MUTATIONS scope is enabled. Returns the index 
mutations as a
    * serialized IndexMutations, or an empty proto if no mutations are present. 
Skips the data table
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index c940765499..5616887c46 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -30,9 +30,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
@@ -41,6 +41,7 @@ import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
@@ -105,12 +106,26 @@ public class IndexCDCConsumer implements Runnable {
   private static final long DEFAULT_POLL_INTERVAL_MS = 1000;
 
   /**
-   * The time buffer in milliseconds subtracted from current time when 
querying CDC mutations. This
-   * buffer helps avoid reading mutations that are too recent.
+   * The time buffer in milliseconds subtracted from current time when 
querying CDC mutations to
+   * help avoid reading mutations that are too recent.
    */
   public static final String INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS =
     "phoenix.index.cdc.consumer.timestamp.buffer.ms";
-  private static final long DEFAULT_TIMESTAMP_BUFFER_MS = 1000;
+  private static final long DEFAULT_TIMESTAMP_BUFFER_MS = 25000;
+
+  /**
+   * Maximum number of retries when CDC events exist but the corresponding 
data table mutations are
+   * not yet visible (or permanently failed). After exceeding this limit, the 
consumer advances past
+   * the unprocessable events to avoid blocking indefinitely. This is only 
used for index mutation
+   * generation approach (serializeCDCMutations = false).
+   */
+  public static final String INDEX_CDC_CONSUMER_MAX_DATA_VISIBILITY_RETRIES =
+    "phoenix.index.cdc.consumer.max.data.visibility.retries";
+  private static final int DEFAULT_MAX_DATA_VISIBILITY_RETRIES = 15;
+
+  public static final String INDEX_CDC_CONSUMER_RETRY_PAUSE_MS =
+    "phoenix.index.cdc.consumer.retry.pause.ms";
+  private static final long DEFAULT_RETRY_PAUSE_MS = 2000;
 
   private final RegionCoprocessorEnvironment env;
   private final String dataTableName;
@@ -121,26 +136,31 @@ public class IndexCDCConsumer implements Runnable {
   private final int batchSize;
   private final long pollIntervalMs;
   private final long timestampBufferMs;
+  private final int maxDataVisibilityRetries;
   private final Configuration config;
+  private final boolean serializeCDCMutations;
   private volatile boolean stopped = false;
   private Thread consumerThread;
   private boolean hasParentPartitions = false;
   private PTable cachedDataTable;
 
   /**
-   * Creates a new IndexCDCConsumer for the given region.
-   * @param env           region coprocessor environment.
-   * @param dataTableName name of the data table.
-   * @param serverName    server name.
+   * Creates a new IndexCDCConsumer for the given region with configurable 
serialization mode.
+   * @param env                   region coprocessor environment.
+   * @param dataTableName         name of the data table.
+   * @param serverName            server name.
+   * @param serializeCDCMutations when true, consumes pre-serialized index 
mutations; when false,
+   *                              generates index mutations from data row 
states.
    * @throws IOException if the IndexWriter cannot be created.
    */
-  public IndexCDCConsumer(RegionCoprocessorEnvironment env, String 
dataTableName, String serverName)
-    throws IOException {
+  public IndexCDCConsumer(RegionCoprocessorEnvironment env, String 
dataTableName, String serverName,
+    boolean serializeCDCMutations) throws IOException {
     this.env = env;
     this.dataTableName = dataTableName;
     this.encodedRegionName = env.getRegion().getRegionInfo().getEncodedName();
     this.config = env.getConfiguration();
-    this.pause = config.getLong(HConstants.HBASE_CLIENT_PAUSE, 300);
+    this.serializeCDCMutations = serializeCDCMutations;
+    this.pause = config.getLong(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, 
DEFAULT_RETRY_PAUSE_MS);
     this.startupDelayMs =
       config.getLong(INDEX_CDC_CONSUMER_STARTUP_DELAY_MS, 
DEFAULT_STARTUP_DELAY_MS);
     this.batchSize = config.getInt(INDEX_CDC_CONSUMER_BATCH_SIZE, 
DEFAULT_CDC_BATCH_SIZE);
@@ -148,6 +168,8 @@ public class IndexCDCConsumer implements Runnable {
       config.getLong(INDEX_CDC_CONSUMER_POLL_INTERVAL_MS, 
DEFAULT_POLL_INTERVAL_MS);
     this.timestampBufferMs =
       config.getLong(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, 
DEFAULT_TIMESTAMP_BUFFER_MS);
+    this.maxDataVisibilityRetries = 
config.getInt(INDEX_CDC_CONSUMER_MAX_DATA_VISIBILITY_RETRIES,
+      DEFAULT_MAX_DATA_VISIBILITY_RETRIES);
     DelegateRegionCoprocessorEnvironment indexWriterEnv =
       new DelegateRegionCoprocessorEnvironment(env, 
ConnectionType.INDEX_WRITER_CONNECTION);
     this.indexWriter =
@@ -249,8 +271,13 @@ public class IndexCDCConsumer implements Runnable {
       while (!stopped) {
         try {
           long previousTimestamp = lastProcessedTimestamp;
-          lastProcessedTimestamp =
-            processCDCBatch(encodedRegionName, encodedRegionName, 
lastProcessedTimestamp, false);
+          if (serializeCDCMutations) {
+            lastProcessedTimestamp =
+              processCDCBatch(encodedRegionName, encodedRegionName, 
lastProcessedTimestamp, false);
+          } else {
+            lastProcessedTimestamp = 
processCDCBatchGenerated(encodedRegionName, encodedRegionName,
+              lastProcessedTimestamp, false);
+          }
           if (lastProcessedTimestamp == previousTimestamp) {
             sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, 
++retryCount));
           } else {
@@ -605,8 +632,14 @@ public class IndexCDCConsumer implements Runnable {
             currentLastProcessedTimestamp = getParentProgress(partitionId);
           }
         }
-        long newTimestamp =
-          processCDCBatch(partitionId, ownerPartitionId, 
currentLastProcessedTimestamp, true);
+        long newTimestamp;
+        if (serializeCDCMutations) {
+          newTimestamp =
+            processCDCBatch(partitionId, ownerPartitionId, 
currentLastProcessedTimestamp, true);
+        } else {
+          newTimestamp = processCDCBatchGenerated(partitionId, 
ownerPartitionId,
+            currentLastProcessedTimestamp, true);
+        }
         batchCount++;
         retryCount = 0;
         if (newTimestamp == currentLastProcessedTimestamp) {
@@ -697,18 +730,7 @@ public class IndexCDCConsumer implements Runnable {
       dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp);
     try (PhoenixConnection conn =
       QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) 
{
-      PTable dataTable = getDataTable(conn);
-      String cdcObjectName = CDCUtil.getCDCObjectName(dataTable, false);
-      if (cdcObjectName == null) {
-        throw new SQLException("No CDC object found for table " + 
dataTableName);
-      }
-      String schemaName = dataTable.getSchemaName().getString();
-      if (schemaName == null || schemaName.isEmpty()) {
-        cdcObjectName = "\"" + cdcObjectName + "\"";
-      } else {
-        cdcObjectName =
-          "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + 
cdcObjectName + "\"";
-      }
+      String cdcObjectName = getCdcObjectName(conn);
       String cdcQuery;
       if (isParentReplay) {
         cdcQuery = String
@@ -728,15 +750,7 @@ public class IndexCDCConsumer implements Runnable {
       int retryCount = 0;
       while (hasMoreRows && batchMutations.isEmpty()) {
         try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
-          ps.setString(1, partitionId);
-          ps.setDate(2, new Date(newLastTimestamp));
-          if (isParentReplay) {
-            ps.setInt(3, batchSize);
-          } else {
-            long currentTime = EnvironmentEdgeManager.currentTimeMillis() - 
timestampBufferMs;
-            ps.setDate(3, new Date(currentTime));
-            ps.setInt(4, batchSize);
-          }
+          setStatementParams(partitionId, isParentReplay, newLastTimestamp, 
ps);
           Pair<Long, Boolean> result =
             getMutationsAndTimestamp(ps, newLastTimestamp, batchMutations);
           hasMoreRows = result.getSecond();
@@ -778,6 +792,236 @@ public class IndexCDCConsumer implements Runnable {
     }
   }
 
+  private String getCdcObjectName(PhoenixConnection conn) throws SQLException {
+    PTable dataTable = getDataTable(conn);
+    String cdcObjectName = CDCUtil.getCDCObjectName(dataTable, false);
+    if (cdcObjectName == null) {
+      throw new SQLException("No CDC object found for table " + dataTableName);
+    }
+    String schemaName = dataTable.getSchemaName().getString();
+    if (schemaName == null || schemaName.isEmpty()) {
+      cdcObjectName = "\"" + cdcObjectName + "\"";
+    } else {
+      cdcObjectName =
+        "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + 
cdcObjectName + "\"";
+    }
+    return cdcObjectName;
+  }
+
+  /**
+   * Processes a batch of CDC events for the given partition starting from the 
specified timestamp
+   * by generating index mutations from data row states. This method queries 
the CDC index with the
+   * DATA_ROW_STATE scope, which triggers a server-side data table scan to 
reconstruct the
+   * before-image ({@code currentDataRowState}) and after-image ({@code 
nextDataRowState}) for each
+   * change.
+   * @param partitionId            the partition (region) ID to process CDC 
events for.
+   * @param ownerPartitionId       the owner partition ID.
+   * @param lastProcessedTimestamp the timestamp to start processing CDC 
events from.
+   * @param isParentReplay         true if replaying a closed parent partition.
+   * @return the new last processed timestamp after this batch, or the same 
timestamp if no new
+   *         records were found.
+   * @throws SQLException         if a SQL error occurs.
+   * @throws IOException          if an I/O error occurs.
+   * @throws InterruptedException if the thread is interrupted while waiting.
+   */
+  private long processCDCBatchGenerated(String partitionId, String 
ownerPartitionId,
+    long lastProcessedTimestamp, boolean isParentReplay)
+    throws SQLException, IOException, InterruptedException {
+    LOG.debug(
+      "Processing CDC batch (generated mode) for table {} partition {} owner 
{} from timestamp {}",
+      dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp);
+    try (PhoenixConnection conn =
+      QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) 
{
+      String cdcObjectName = getCdcObjectName(conn);
+      String cdcQuery;
+      if (isParentReplay) {
+        cdcQuery = String
+          .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ 
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
+            + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > 
? "
+            + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT 
?", cdcObjectName);
+      } else {
+        cdcQuery = String
+          .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ 
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
+            + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > 
? "
+            + "AND PHOENIX_ROW_TIMESTAMP() < ? "
+            + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT 
?", cdcObjectName);
+      }
+
+      List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates = new 
ArrayList<>();
+      long newLastTimestamp = lastProcessedTimestamp;
+      long[] lastScannedTimestamp = { lastProcessedTimestamp };
+      boolean hasMoreRows = true;
+      int retryCount = 0;
+      while (hasMoreRows && batchStates.isEmpty()) {
+        try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
+          setStatementParams(partitionId, isParentReplay, newLastTimestamp, 
ps);
+          Pair<Long, Boolean> result =
+            getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, 
lastScannedTimestamp);
+          hasMoreRows = result.getSecond();
+          if (hasMoreRows) {
+            if (!batchStates.isEmpty()) {
+              newLastTimestamp = result.getFirst();
+            } else if (retryCount >= maxDataVisibilityRetries) {
+              LOG.warn(
+                "Skipping CDC events for table {} partition {} from timestamp 
{}"
+                  + " to {} after {} retries — data table mutations may have 
failed",
+                dataTableName, partitionId, newLastTimestamp, 
lastScannedTimestamp[0], retryCount);
+              newLastTimestamp = lastScannedTimestamp[0];
+              break;
+            } else {
+              // CDC index entries are written but the data is not yet visible.
+              // Don't advance newLastTimestamp so the same events are 
re-fetched
+              // once the data becomes visible.
+              sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, 
++retryCount));
+            }
+          }
+        }
+      }
+      if (newLastTimestamp > lastProcessedTimestamp) {
+        String sameTimestampQuery = String
+          .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ 
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
+            + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() = 
? "
+            + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC", 
cdcObjectName);
+        final long timestampToRefetch = newLastTimestamp;
+        batchStates.removeIf(pair -> pair.getFirst() == timestampToRefetch);
+        try (PreparedStatement ps = conn.prepareStatement(sameTimestampQuery)) 
{
+          ps.setString(1, partitionId);
+          ps.setDate(2, new Date(newLastTimestamp));
+          Pair<Long, Boolean> result =
+            getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, 
lastScannedTimestamp);
+          newLastTimestamp = result.getFirst();
+          if (batchStates.isEmpty()) {
+            newLastTimestamp = timestampToRefetch;
+          } else if (newLastTimestamp != timestampToRefetch) {
+            throw new IOException("Unexpected timestamp mismatch: expected " + 
timestampToRefetch
+              + " but got " + newLastTimestamp);
+          }
+        }
+      }
+      generateAndApplyIndexMutations(conn, batchStates, partitionId, 
ownerPartitionId,
+        newLastTimestamp);
+      if (newLastTimestamp > lastProcessedTimestamp) {
+        updateTrackerProgress(conn, partitionId, ownerPartitionId, 
newLastTimestamp,
+          PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
+      }
+      return newLastTimestamp;
+    }
+  }
+
+  private void setStatementParams(String partitionId, boolean isParentReplay, 
long newLastTimestamp,
+    PreparedStatement ps) throws SQLException {
+    ps.setString(1, partitionId);
+    ps.setDate(2, new Date(newLastTimestamp));
+    if (isParentReplay) {
+      ps.setInt(3, batchSize);
+    } else {
+      long currentTime = EnvironmentEdgeManager.currentTimeMillis() - 
timestampBufferMs;
+      ps.setDate(3, new Date(currentTime));
+      ps.setInt(4, batchSize);
+    }
+  }
+
+  private static Pair<Long, Boolean> 
getDataRowStatesAndTimestamp(PreparedStatement ps,
+    long initialLastTimestamp, List<Pair<Long, 
IndexMutationsProtos.DataRowStates>> batchStates,
+    long[] lastScannedTimestamp) throws SQLException, IOException {
+    boolean hasRows = false;
+    long lastTimestamp = initialLastTimestamp;
+    lastScannedTimestamp[0] = initialLastTimestamp;
+    try (ResultSet rs = ps.executeQuery()) {
+      while (rs.next()) {
+        hasRows = true;
+        long rowTimestamp = rs.getDate(1).getTime();
+        lastScannedTimestamp[0] = rowTimestamp;
+        String cdcValue = rs.getString(2);
+        if (cdcValue != null && !cdcValue.isEmpty()) {
+          byte[] protoBytes = Base64.getDecoder().decode(cdcValue);
+          IndexMutationsProtos.DataRowStates dataRowStates =
+            IndexMutationsProtos.DataRowStates.parseFrom(protoBytes);
+          if (
+            dataRowStates.hasDataRowKey()
+              && (dataRowStates.hasCurrentDataRowState() || 
dataRowStates.hasNextDataRowState())
+          ) {
+            batchStates.add(Pair.newPair(rowTimestamp, dataRowStates));
+            lastTimestamp = rowTimestamp;
+          }
+        }
+      }
+    }
+    return Pair.newPair(lastTimestamp, hasRows);
+  }
+
+  private void generateAndApplyIndexMutations(PhoenixConnection conn,
+    List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates, String 
partitionId,
+    String ownerPartitionId, long lastProcessedTimestamp) throws SQLException, 
IOException {
+    if (batchStates.isEmpty()) {
+      return;
+    }
+    refreshDataTableCache(conn);
+    PTable dataTable = getDataTable(conn);
+    byte[] encodedRegionNameBytes = 
env.getRegion().getRegionInfo().getEncodedNameAsBytes();
+    List<Pair<IndexMaintainer, HTableInterfaceReference>> indexTables = new 
ArrayList<>();
+    for (PTable index : dataTable.getIndexes()) {
+      IndexConsistency consistency = index.getIndexConsistency();
+      if (consistency != null && consistency.isAsynchronous()) {
+        IndexMaintainer maintainer = index.getIndexMaintainer(dataTable, conn);
+        HTableInterfaceReference tableRef =
+          new HTableInterfaceReference(new 
ImmutableBytesPtr(maintainer.getIndexTableName()));
+        indexTables.add(new Pair<>(maintainer, tableRef));
+      }
+    }
+    if (indexTables.isEmpty()) {
+      return;
+    }
+    ListMultimap<HTableInterfaceReference, Mutation> indexUpdates = 
ArrayListMultimap.create();
+    int totalMutations = 0;
+    for (Pair<Long, IndexMutationsProtos.DataRowStates> entry : batchStates) {
+      long ts = entry.getFirst();
+      IndexMutationsProtos.DataRowStates dataRowStates = entry.getSecond();
+      byte[] dataRowKey = dataRowStates.getDataRowKey().toByteArray();
+      ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(dataRowKey);
+
+      Put currentDataRowState = null;
+      if (dataRowStates.hasCurrentDataRowState()) {
+        ClientProtos.MutationProto currentProto = ClientProtos.MutationProto
+          .parseFrom(dataRowStates.getCurrentDataRowState().toByteArray());
+        Mutation currentMutation = ProtobufUtil.toMutation(currentProto);
+        if (currentMutation instanceof Put) {
+          currentDataRowState = (Put) currentMutation;
+        }
+      }
+      Put nextDataRowState = null;
+      if (dataRowStates.hasNextDataRowState()) {
+        ClientProtos.MutationProto nextProto =
+          
ClientProtos.MutationProto.parseFrom(dataRowStates.getNextDataRowState().toByteArray());
+        Mutation nextMutation = ProtobufUtil.toMutation(nextProto);
+        if (nextMutation instanceof Put) {
+          nextDataRowState = (Put) nextMutation;
+        }
+      }
+      if (currentDataRowState == null && nextDataRowState == null) {
+        continue;
+      }
+      IndexRegionObserver.generateIndexMutationsForRow(rowKeyPtr, 
currentDataRowState,
+        nextDataRowState, ts, encodedRegionNameBytes, 
QueryConstants.VERIFIED_BYTES, indexTables,
+        indexUpdates);
+      if (indexUpdates.size() >= batchSize) {
+        indexWriter.write(indexUpdates, false, 
MetaDataProtocol.PHOENIX_VERSION);
+        totalMutations += indexUpdates.size();
+        indexUpdates.clear();
+      }
+    }
+    if (!indexUpdates.isEmpty()) {
+      indexWriter.write(indexUpdates, false, MetaDataProtocol.PHOENIX_VERSION);
+      totalMutations += indexUpdates.size();
+    }
+    if (totalMutations > 0) {
+      LOG.debug(
+        "Applied total {} index mutations for table {} partition {} owner {} "
+          + ", last processed timestamp {}",
+        totalMutations, dataTableName, partitionId, ownerPartitionId, 
lastProcessedTimestamp);
+    }
+  }
+
   private void executeIndexMutations(String partitionId,
     List<Pair<Long, IndexMutationsProtos.IndexMutations>> batchMutations, 
String ownerPartitionId,
     long lastProcessedTimestamp) throws SQLException, IOException {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index a3934f6b0f..48c823d258 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -178,6 +178,56 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   public static final String PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED =
     "phoenix.index.cdc.mutations.compress.enabled";
   public static final boolean 
DEFAULT_PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED = false;
+  /**
+   * Controls which approach is used for implementing eventually consistent 
global secondary indexes
+   * via the {@link IndexCDCConsumer}.
+   * <p>
+   * <b>Approach 1: Serialized mutations (default, value = true)</b>
+   * </p>
+   * <p>
+   * During {@code preBatchMutate}, {@link IndexRegionObserver} generates 
index mutations for each
+   * data table mutation and serializes them into a Protobuf {@code 
IndexMutations} message. This
+   * serialized payload is written as a column value in the CDC index table 
row alongside the CDC
+   * event. The {@link IndexCDCConsumer} later reads these pre-computed 
mutations from the CDC
+   * index, deserializes them, and applies them directly to the index 
table(s). In this approach,
+   * the consumer does not need to understand index structure or re-derive 
mutations — it simply
+   * replays what was already computed on the write path. The trade-off is 
increased CDC index row
+   * size due to the serialized mutation payload.
+   * </p>
+   * <p>
+   * <b>Approach 2: Generated mutations from data row states (value = 
false)</b>
+   * </p>
+   * <p>
+   * During {@code preBatchMutate}, {@link IndexRegionObserver} writes only a 
lightweight CDC index
+   * entry without serialized index mutations. Instead, the CDC event is 
created with the
+   * {@code DATA_ROW_STATE} scope. When the {@link IndexCDCConsumer} processes 
these events, it
+   * reads the CDC index rows which trigger a server-side scan of the data 
table (via
+   * {@code CDCGlobalIndexRegionScanner}) to reconstruct the before-image
+   * ({@code currentDataRowState}) and after-image ({@code nextDataRowState}) 
of the data row at the
+   * change timestamp. These raw row states are returned as a Protobuf {@code 
DataRowStates}
+   * message. The consumer then feeds these states into {@code 
generateIndexMutationsForRow()} — the
+   * same core utility used by {@link 
IndexRegionObserver#prepareIndexMutations} on the write path —
+   * to derive index mutations at consume time. This approach keeps CDC index 
rows small and
+   * generates mutations based on the current index definition, but requires 
an additional data
+   * table read per CDC event and is sensitive to data visibility timing. Make 
sure max lookback age
+   * is long enough to retain before and after images of the row.
+   * </p>
+   * <p>
+   * <b>When to use which approach:</b>
+   * </p>
+   * <ul>
+   * <li>Use <b>Approach 1</b> (serialize = true) when scanning each data 
table row at consume time
+   * could be an IO bottleneck, and slightly higher write-path latency due to 
index mutation
+   * serialization is acceptable.</li>
+   * <li>Use <b>Approach 2</b> (serialize = false) when uniform and 
predictable write latency is a
+   * strict requirement regardless of the number and type (covered or 
uncovered) of the eventually
+   * consistent global secondary indexes, and the additional data table 
point-lookup with raw scan
+   * per CDC event at consume time is not a big IO concern.</li>
+   * </ul>
+   */
+  public static final String PHOENIX_INDEX_CDC_MUTATION_SERIALIZE =
+    "phoenix.index.cdc.mutation.serialize";
+  public static final boolean DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE = 
true;
 
   /**
    * Class to represent pending data table rows
@@ -433,6 +483,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL;
   private boolean indexCDCConsumerEnabled = 
DEFAULT_PHOENIX_INDEX_CDC_CONSUMER_ENABLED;
   private boolean compressCDCMutations = 
DEFAULT_PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED;
+  private boolean serializeCDCMutations = 
DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
   private boolean isNamespaceEnabled = false;
   private boolean useBloomFilter = false;
   private long lastTimestamp = 0;
@@ -494,6 +545,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       this.compressCDCMutations =
         
env.getConfiguration().getBoolean(PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED,
           DEFAULT_PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED);
+      this.serializeCDCMutations = env.getConfiguration().getBoolean(
+        PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, 
DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE);
       this.isNamespaceEnabled =
         SchemaUtil.isNamespaceMappingEnabled(PTableType.INDEX, 
env.getConfiguration());
       TableDescriptor tableDescriptor = env.getRegion().getTableDescriptor();
@@ -504,7 +557,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         this.indexCDCConsumerEnabled && 
!this.dataTableName.startsWith("SYSTEM.")
           && !this.dataTableName.startsWith("SYSTEM:")
       ) {
-        this.indexCDCConsumer = new IndexCDCConsumer(env, this.dataTableName, 
serverName);
+        this.indexCDCConsumer =
+          new IndexCDCConsumer(env, this.dataTableName, serverName, 
this.serializeCDCMutations);
         this.indexCDCConsumer.start();
       }
     } catch (NoSuchMethodError ex) {
@@ -1208,6 +1262,76 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
   }
 
+  public static void generateIndexMutationsForRow(ImmutableBytesPtr rowKeyPtr,
+    Put currentDataRowState, Put nextDataRowState, long ts, byte[] 
encodedRegionName,
+    byte[] emptyColumnValue, List<Pair<IndexMaintainer, 
HTableInterfaceReference>> indexTables,
+    ListMultimap<HTableInterfaceReference, Mutation> indexUpdates) throws 
IOException {
+    for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables) {
+      IndexMaintainer indexMaintainer = pair.getFirst();
+      HTableInterfaceReference hTableInterfaceReference = pair.getSecond();
+      if (
+        nextDataRowState != null && 
indexMaintainer.shouldPrepareIndexMutations(nextDataRowState)
+      ) {
+        ValueGetter nextDataRowVG = new 
IndexUtil.SimpleValueGetter(nextDataRowState);
+        Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+          nextDataRowVG, rowKeyPtr, ts, null, null, false, encodedRegionName);
+        if (indexPut == null) {
+          // No covered column. Just prepare an index row with the empty column
+          byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG, 
rowKeyPtr, null, null, ts,
+            encodedRegionName);
+          indexPut = new Put(indexRowKey);
+        } else {
+          IndexUtil.removeEmptyColumn(indexPut,
+            indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+            indexMaintainer.getEmptyKeyValueQualifier());
+        }
+        byte[] finalEmptyColumnValue =
+          indexMaintainer.isUncovered() ? QueryConstants.UNVERIFIED_BYTES : 
emptyColumnValue;
+        
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+          indexMaintainer.getEmptyKeyValueQualifier(), ts, 
finalEmptyColumnValue);
+        indexUpdates.put(hTableInterfaceReference, indexPut);
+        if (!ignoreWritingDeleteColumnsToIndex) {
+          Delete deleteColumn = 
indexMaintainer.buildDeleteColumnMutation(indexPut, ts);
+          if (deleteColumn != null) {
+            indexUpdates.put(hTableInterfaceReference, deleteColumn);
+          }
+        }
+        // Delete the current index row if the new index key is different from 
the
+        // current one and the index is not a CDC index
+        if (currentDataRowState != null) {
+          ValueGetter currentDataRowVG = new 
IndexUtil.SimpleValueGetter(currentDataRowState);
+          byte[] indexRowKeyForCurrentDataRow = 
indexMaintainer.buildRowKey(currentDataRowVG,
+            rowKeyPtr, null, null, ts, encodedRegionName);
+          if (
+            !indexMaintainer.isCDCIndex()
+              && Bytes.compareTo(indexPut.getRow(), 
indexRowKeyForCurrentDataRow) != 0
+          ) {
+            Mutation del = 
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+              IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+            indexUpdates.put(hTableInterfaceReference, del);
+          }
+        }
+      } else if (
+        currentDataRowState != null
+          && indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)
+      ) {
+        if (indexMaintainer.isCDCIndex()) {
+          // CDC Index needs two a delete marker for referencing the data table
+          // delete mutation with the right index row key, that is, the index 
row key
+          // starting with ts
+          Put cdcDataRowState = new Put(currentDataRowState.getRow());
+          cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
+            indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts, 
ByteUtil.EMPTY_BYTE_ARRAY);
+          indexUpdates.put(hTableInterfaceReference, 
getDeleteIndexMutation(cdcDataRowState,
+            indexMaintainer, ts, rowKeyPtr, encodedRegionName));
+        } else {
+          indexUpdates.put(hTableInterfaceReference, 
getDeleteIndexMutation(currentDataRowState,
+            indexMaintainer, ts, rowKeyPtr, encodedRegionName));
+        }
+      }
+    }
+  }
+
   /**
    * Generate the index update for a data row from the mutation that are 
obtained by merging the
    * previous data row state with the pending row mutation.
@@ -1220,6 +1344,12 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       if (indexMaintainer.isLocalIndex()) {
         continue;
       }
+      if (
+        !serializeCDCMutations && indexMaintainer.getIndexConsistency() != null
+          && indexMaintainer.getIndexConsistency().isAsynchronous()
+      ) {
+        continue;
+      }
       HTableInterfaceReference hTableInterfaceReference =
         new HTableInterfaceReference(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
       indexTables.add(new Pair<>(indexMaintainer, hTableInterfaceReference));
@@ -1232,73 +1362,12 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       if (currentDataRowState == null && nextDataRowState == null) {
         continue;
       }
-      for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables) 
{
-        IndexMaintainer indexMaintainer = pair.getFirst();
-        HTableInterfaceReference hTableInterfaceReference = pair.getSecond();
-        if (
-          nextDataRowState != null && 
indexMaintainer.shouldPrepareIndexMutations(nextDataRowState)
-        ) {
-          ValueGetter nextDataRowVG = new 
IndexUtil.SimpleValueGetter(nextDataRowState);
-          Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
-            nextDataRowVG, rowKeyPtr, ts, null, null, false, 
encodedRegionName);
-          if (indexPut == null) {
-            // No covered column. Just prepare an index row with the empty 
column
-            byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG, 
rowKeyPtr, null, null,
-              ts, encodedRegionName);
-            indexPut = new Put(indexRowKey);
-          } else {
-            IndexUtil.removeEmptyColumn(indexPut,
-              indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
-              indexMaintainer.getEmptyKeyValueQualifier());
-          }
-          
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
-            indexMaintainer.getEmptyKeyValueQualifier(), ts, 
QueryConstants.UNVERIFIED_BYTES);
-          context.indexUpdates.put(hTableInterfaceReference,
-            new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get()));
-          if (!ignoreWritingDeleteColumnsToIndex) {
-            Delete deleteColumn = 
indexMaintainer.buildDeleteColumnMutation(indexPut, ts);
-            if (deleteColumn != null) {
-              context.indexUpdates.put(hTableInterfaceReference,
-                new Pair<Mutation, byte[]>(deleteColumn, rowKeyPtr.get()));
-            }
-          }
-          // Delete the current index row if the new index key is different 
from the
-          // current one and the index is not a CDC index
-          if (currentDataRowState != null) {
-            ValueGetter currentDataRowVG = new 
IndexUtil.SimpleValueGetter(currentDataRowState);
-            byte[] indexRowKeyForCurrentDataRow = 
indexMaintainer.buildRowKey(currentDataRowVG,
-              rowKeyPtr, null, null, ts, encodedRegionName);
-            if (
-              !indexMaintainer.isCDCIndex()
-                && Bytes.compareTo(indexPut.getRow(), 
indexRowKeyForCurrentDataRow) != 0
-            ) {
-              Mutation del = 
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
-                IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
-              context.indexUpdates.put(hTableInterfaceReference,
-                new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
-            }
-          }
-        } else if (
-          currentDataRowState != null
-            && indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)
-        ) {
-          if (indexMaintainer.isCDCIndex()) {
-            // CDC Index needs two a delete marker for referencing the data 
table
-            // delete mutation with the right index row key, that is, the 
index row key
-            // starting with ts
-            Put cdcDataRowState = new Put(currentDataRowState.getRow());
-            cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
-              indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
-              ByteUtil.EMPTY_BYTE_ARRAY);
-            context.indexUpdates.put(hTableInterfaceReference,
-              new Pair<Mutation, 
byte[]>(getDeleteIndexMutation(cdcDataRowState, indexMaintainer,
-                ts, rowKeyPtr, encodedRegionName), rowKeyPtr.get()));
-          } else {
-            context.indexUpdates.put(hTableInterfaceReference,
-              new Pair<Mutation, 
byte[]>(getDeleteIndexMutation(currentDataRowState,
-                indexMaintainer, ts, rowKeyPtr, encodedRegionName), 
rowKeyPtr.get()));
-          }
-        }
+      ListMultimap<HTableInterfaceReference, Mutation> idxUpdates = 
ArrayListMultimap.create();
+      generateIndexMutationsForRow(rowKeyPtr, currentDataRowState, 
nextDataRowState, ts,
+        encodedRegionName, QueryConstants.UNVERIFIED_BYTES, indexTables, 
idxUpdates);
+      for (Map.Entry<HTableInterfaceReference, Mutation> idxUpdate : 
idxUpdates.entries()) {
+        context.indexUpdates.put(idxUpdate.getKey(),
+          new Pair<>(idxUpdate.getValue(), rowKeyPtr.get()));
       }
     }
   }
@@ -1326,8 +1395,10 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>> 
create();
       prepareIndexMutations(context, maintainers, batchTimestamp);
 
-      prepareEventuallyConsistentIndexMutations(context, batchTimestamp, 
maintainers,
-        compressCDCMutations);
+      if (serializeCDCMutations) {
+        prepareEventuallyConsistentIndexMutations(context, batchTimestamp, 
maintainers,
+          compressCDCMutations);
+      }
 
       context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, 
Mutation> create();
       int updateCount = 0;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java
index 5ca60cbd9c..3b68adb884 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.end2end;
 
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -42,10 +44,13 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.ExplainPlanAttributes;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.bson.BsonArray;
 import org.bson.BsonBinary;
 import org.bson.BsonDocument;
@@ -54,19 +59,32 @@ import org.bson.BsonNull;
 import org.bson.BsonString;
 import org.bson.RawBsonDocument;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
 /**
  * Tests for BSON with expression field key alias.
  */
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
 public class Bson5IT extends ParallelStatsDisabledIT {
 
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+    
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+      Integer.toString(60 * 60));
+    props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(false));
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000));
+    props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+
   private static String getJsonString(String jsonFilePath) throws IOException {
     URL fileUrl = Bson5IT.class.getClassLoader().getResource(jsonFilePath);
     Preconditions.checkArgument(fileUrl != null, "File path " + jsonFilePath + 
" seems invalid");
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
similarity index 52%
copy from 
phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
copy to 
phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
index fd54b57ffb..1bf63c7579 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
@@ -17,37 +17,55 @@
  */
 package org.apache.phoenix.end2end;
 
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
-import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
-/**
- * Test class that extends ConcurrentMutationsExtendedIT with lazy post batch 
write enabled.
- */
 @Category(NeedsOwnMiniClusterTest.class)
-public class ConcurrentMutationsLazyPostBatchWriteIT extends 
ConcurrentMutationsExtendedIT {
+public class ConcurrentMutationsCoveredEventualGenerateIT
+  extends ConcurrentMutationsExtendedIndexIT {
+
+  private static final int MAX_LOOKBACK_AGE = 1000000;
 
-  public ConcurrentMutationsLazyPostBatchWriteIT(boolean uncovered, boolean 
eventual) {
+  public ConcurrentMutationsCoveredEventualGenerateIT(boolean uncovered, 
boolean eventual) {
     super(uncovered, eventual);
-    Assume.assumeFalse("Only covered index supports lazy post batch write 
mode", uncovered);
   }
 
   @BeforeClass
   public static synchronized void doSetup() throws Exception {
-    Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(10);
     
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
-    props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true");
     
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
       Integer.toString(MAX_LOOKBACK_AGE));
     props.put("hbase.rowlock.wait.duration", "100");
     props.put("phoenix.index.concurrent.wait.duration.ms", "10");
+    props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500));
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(5000));
+    props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(200));
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
+    props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
+
+  @Parameterized.Parameters(name = "uncovered={0}, eventual={1}")
+  public static synchronized Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] { { false, true } });
+  }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedGenerateIT.java
similarity index 55%
copy from 
phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
copy to 
phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedGenerateIT.java
index fd54b57ffb..7a5cd7fb67 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedGenerateIT.java
@@ -17,37 +17,51 @@
  */
 package org.apache.phoenix.end2end;
 
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
-import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
-/**
- * Test class that extends ConcurrentMutationsExtendedIT with lazy post batch 
write enabled.
- */
 @Category(NeedsOwnMiniClusterTest.class)
-public class ConcurrentMutationsLazyPostBatchWriteIT extends 
ConcurrentMutationsExtendedIT {
+public class ConcurrentMutationsExtendedGenerateIT extends 
ConcurrentMutationsExtendedIT {
 
-  public ConcurrentMutationsLazyPostBatchWriteIT(boolean uncovered, boolean 
eventual) {
+  public ConcurrentMutationsExtendedGenerateIT(boolean uncovered, boolean 
eventual) {
     super(uncovered, eventual);
-    Assume.assumeFalse("Only covered index supports lazy post batch write 
mode", uncovered);
   }
 
   @BeforeClass
   public static synchronized void doSetup() throws Exception {
-    Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(10);
     
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
-    props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true");
     
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
       Integer.toString(MAX_LOOKBACK_AGE));
     props.put("hbase.rowlock.wait.duration", "100");
     props.put("phoenix.index.concurrent.wait.duration.ms", "10");
+    props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500));
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2500));
+    // props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(200));
+    props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
+
+  @Parameterized.Parameters(name = "uncovered={0}, eventual={1}")
+  public static synchronized Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] { { false, true }, { true, true } });
+  }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index ecbabb2473..671d130501 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -95,6 +96,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
     props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
     props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
     props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500));
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(1000));
     props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
@@ -177,7 +179,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
 
     doneSignal.await(60, TimeUnit.SECONDS);
     if (eventual) {
-      Thread.sleep(15000);
+      Thread.sleep(35000);
     }
     verifyIndexTable(tableName, indexName, conn);
   }
@@ -249,7 +251,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
 
     doneSignal.await(60, TimeUnit.SECONDS);
     if (eventual) {
-      Thread.sleep(15000);
+      Thread.sleep(35000);
     }
     verifyIndexTable(tableName, indexName, conn);
     verifyIndexTable(tableName, singleCellindexName, conn);
@@ -374,7 +376,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
     doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS);
     assertNull(failedMsg[0], failedMsg[0]);
     if (eventual) {
-      Thread.sleep(15000);
+      Thread.sleep(35000);
     }
     long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, 
indexName);
     assertEquals(1, actualRowCount);
@@ -435,7 +437,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
 
     doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS);
     if (eventual) {
-      Thread.sleep(15000);
+      Thread.sleep(35000);
     }
     long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, 
indexName);
     assertEquals(1, actualRowCount);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
index 61ba4f02b5..531e036a9c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
@@ -62,7 +62,8 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
     ConcurrentMutationsExtendedIT.doSetup();
   }
 
-  @Test(timeout = 2000000)
+  // This test is heavy and it might exhaust jenkins resources
+  @Test(timeout = 1800000)
   public void testConcurrentUpsertsWithTableSplits() throws Exception {
     int nThreads = 12;
     final int batchSize = 100;
@@ -169,7 +170,8 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
     assertEquals(nRows, actualRowCount);
   }
 
-  @Test(timeout = 5000000)
+  // This test is heavy and it might exhaust jenkins resources
+  @Test(timeout = 1800000)
   public void testConcurrentUpsertsWithTableSplitsMerges() throws Exception {
     Assume.assumeFalse(uncovered);
     int nThreads = 13;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
index fd54b57ffb..a6ecfa7e60 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.end2end;
 
 import java.util.Map;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.query.QueryServices;
@@ -41,13 +42,16 @@ public class ConcurrentMutationsLazyPostBatchWriteIT 
extends ConcurrentMutations
 
   @BeforeClass
   public static synchronized void doSetup() throws Exception {
-    Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(8);
     
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
     props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true");
     
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
       Integer.toString(MAX_LOOKBACK_AGE));
     props.put("hbase.rowlock.wait.duration", "100");
     props.put("phoenix.index.concurrent.wait.duration.ms", "10");
+    props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
similarity index 52%
copy from 
phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
copy to 
phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
index fd54b57ffb..5ecfc1c036 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
@@ -17,37 +17,55 @@
  */
 package org.apache.phoenix.end2end;
 
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
-import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
-/**
- * Test class that extends ConcurrentMutationsExtendedIT with lazy post batch 
write enabled.
- */
 @Category(NeedsOwnMiniClusterTest.class)
-public class ConcurrentMutationsLazyPostBatchWriteIT extends 
ConcurrentMutationsExtendedIT {
+public class ConcurrentMutationsUncoveredEventualGenerateIT
+  extends ConcurrentMutationsExtendedIndexIT {
+
+  private static final int MAX_LOOKBACK_AGE = 1000000;
 
-  public ConcurrentMutationsLazyPostBatchWriteIT(boolean uncovered, boolean 
eventual) {
+  public ConcurrentMutationsUncoveredEventualGenerateIT(boolean uncovered, 
boolean eventual) {
     super(uncovered, eventual);
-    Assume.assumeFalse("Only covered index supports lazy post batch write 
mode", uncovered);
   }
 
   @BeforeClass
   public static synchronized void doSetup() throws Exception {
-    Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(10);
     
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
-    props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true");
     
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
       Integer.toString(MAX_LOOKBACK_AGE));
     props.put("hbase.rowlock.wait.duration", "100");
     props.put("phoenix.index.concurrent.wait.duration.ms", "10");
+    props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500));
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(5000));
+    props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(200));
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
+    props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
+
+  @Parameterized.Parameters(name = "uncovered={0}, eventual={1}")
+  public static synchronized Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] { { true, true } });
+  }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualGenerateIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualGenerateIT.java
new file mode 100644
index 0000000000..533427174d
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualGenerateIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexToolForNonTxGlobalIndexEventualGenerateIT extends 
IndexToolForNonTxGlobalIndexIT {
+
+  public IndexToolForNonTxGlobalIndexEventualGenerateIT(boolean mutable, 
boolean singleCell) {
+    super(mutable, singleCell);
+    if (indexDDLOptions.trim().isEmpty()) {
+      indexDDLOptions = " CONSISTENCY=EVENTUAL";
+    } else {
+      indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions;
+    }
+  }
+
+  @Override
+  protected void waitForEventualConsistency() throws Exception {
+    Thread.sleep(18000);
+  }
+
+  @BeforeClass
+  public static synchronized void setup() throws Exception {
+    Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(13);
+    serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, 
Long.toString(20));
+    
serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+      Long.toString(5));
+    serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+      QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+    serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, 
Long.toString(8));
+    
serverProps.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+      Long.toString(MAX_LOOKBACK_AGE));
+    serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    
serverProps.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+      Long.toString(0));
+    serverProps.put("hbase.regionserver.rpc.retry.interval", Long.toString(0));
+    serverProps.put("hbase.procedure.remote.dispatcher.delay.msec", 
Integer.toString(0));
+    serverProps.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, 
Integer.toString(2000));
+    serverProps.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+    serverProps.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, 
Boolean.FALSE.toString());
+    serverProps.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 
Integer.toString(-1));
+    Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5);
+    clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(true));
+    clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, 
Long.toString(5));
+    clientProps.put(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
+    clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, 
Boolean.TRUE.toString());
+    clientProps.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
+      Boolean.TRUE.toString());
+    destroyDriver();
+    setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+      new ReadOnlyProps(clientProps.entrySet().iterator()));
+    
getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
 "1");
+  }
+
+  @Parameterized.Parameters(name = "mutable={0}, singleCellIndex={1}")
+  public static synchronized Collection<Object[]> data() {
+    return Arrays.asList(
+      new Object[][] { { true, true }, { true, false }, { false, true }, { 
false, false } });
+  }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java
new file mode 100644
index 0000000000..287f95c57a
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexToolForNonTxGlobalIndexEventualIT extends 
IndexToolForNonTxGlobalIndexIT {
+
+  public IndexToolForNonTxGlobalIndexEventualIT(boolean mutable, boolean 
singleCell) {
+    super(mutable, singleCell);
+    if (indexDDLOptions.trim().isEmpty()) {
+      indexDDLOptions = " CONSISTENCY=EVENTUAL";
+    } else {
+      indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions;
+    }
+  }
+
+  @Override
+  protected void waitForEventualConsistency() throws Exception {
+    Thread.sleep(15000);
+  }
+
+  @BeforeClass
+  public static synchronized void setup() throws Exception {
+    Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(12);
+    serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, 
Long.toString(20));
+    
serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+      Long.toString(5));
+    serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+      QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+    serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, 
Long.toString(8));
+    
serverProps.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+      Long.toString(MAX_LOOKBACK_AGE));
+    serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    
serverProps.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+      Long.toString(0));
+    serverProps.put("hbase.regionserver.rpc.retry.interval", Long.toString(0));
+    serverProps.put("hbase.procedure.remote.dispatcher.delay.msec", 
Integer.toString(0));
+    serverProps.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, 
Integer.toString(2000));
+    serverProps.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+    serverProps.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 
Integer.toString(-1));
+    Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5);
+    clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(true));
+    clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, 
Long.toString(5));
+    clientProps.put(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
+    clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, 
Boolean.TRUE.toString());
+    clientProps.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
+      Boolean.TRUE.toString());
+    destroyDriver();
+    setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+      new ReadOnlyProps(clientProps.entrySet().iterator()));
+    
getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
 "1");
+  }
+
+  @Parameterized.Parameters(name = "mutable={0}, singleCellIndex={1}")
+  public static synchronized Collection<Object[]> data() {
+    return Arrays.asList(
+      new Object[][] { { true, true }, { true, false }, { false, true }, { 
false, false } });
+  }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index d6061a8380..5bf5c905ce 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -128,11 +128,11 @@ import 
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 public class IndexToolForNonTxGlobalIndexIT extends BaseTest {
 
   public static final int MAX_LOOKBACK_AGE = 3600;
-  private final String tableDDLOptions;
+  protected final String tableDDLOptions;
 
   private final boolean useSnapshot = false;
-  private final boolean mutable;
-  private final String indexDDLOptions;
+  protected final boolean mutable;
+  protected String indexDDLOptions;
   private boolean singleCell;
 
   @Rule
@@ -194,6 +194,9 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
     
getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
 "1");
   }
 
+  protected void waitForEventualConsistency() throws Exception {
+  }
+
   @After
   public void cleanup() throws Exception {
     Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -239,15 +242,18 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
       assertEquals(NROWS, 
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
       assertTrue("Index rebuild failed!", indexTool.getJob().isSuccessful());
       TestUtil.assertIndexState(conn, indexTableFullName, PIndexState.ACTIVE, 
null);
+      waitForEventualConsistency();
       long actualRowCount =
         IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, 
indexTableFullName);
       assertEquals(NROWS, actualRowCount);
       IndexToolIT.setEveryNthRowWithNull(NROWS, 5, stmt);
       conn.commit();
+      waitForEventualConsistency();
       actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, 
indexTableFullName);
       assertEquals(NROWS, actualRowCount);
       IndexToolIT.setEveryNthRowWithNull(NROWS, 7, stmt);
       conn.commit();
+      waitForEventualConsistency();
       actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, 
indexTableFullName);
       assertEquals(NROWS, actualRowCount);
       actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, 
indexTableFullName);
@@ -444,6 +450,7 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
       }
       assertEquals(0, indexTool.getJob().getCounters()
         
.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+      waitForEventualConsistency();
       long actualRowCount =
         IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, 
indexTableFullName);
       assertEquals(NROWS, actualRowCount);
@@ -453,6 +460,7 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
         IndexToolIT.upsertRow(stmt1, i);
       }
       conn.commit();
+      waitForEventualConsistency();
       indexTool = IndexToolIT.runIndexTool(useSnapshot, schemaName, 
dataTableName, indexTableName,
         null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]);
       assertEquals(2 * NROWS,
@@ -491,6 +499,7 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
         
.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
       assertEquals(0, indexTool.getJob().getCounters()
         
.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+      waitForEventualConsistency();
       actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, 
indexTableFullName);
       assertEquals(2 * NROWS, actualRowCount);
     }
@@ -553,6 +562,7 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
         .findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
       assertEquals(0, indexTool.getJob().getCounters()
         .findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+      waitForEventualConsistency();
       long actualRowCount =
         IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, 
indexTableFullName);
       assertEquals(N_ROWS, actualRowCount);
@@ -579,6 +589,7 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
         null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
       assertEquals(0, indexTool.getJob().getCounters()
         .findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+      waitForEventualConsistency();
       actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, 
indexTableFullName);
       assertEquals(N_ROWS, actualRowCount);
     }
@@ -586,6 +597,8 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
 
   @Test
   public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
+    Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+      indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
     Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
     try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
       String schemaName = generateUniqueName();
@@ -629,6 +642,8 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
 
   @Test
   public void testIndexToolVerifyAfterOption() throws Exception {
+    Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+      indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
     Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
     try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
       String schemaName = generateUniqueName();
@@ -842,6 +857,8 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
 
   @Test
   public void testIndexToolForIncrementalVerify() throws Exception {
+    Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+      indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
     ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
     String schemaName = generateUniqueName();
     String dataTableName = generateUniqueName();
@@ -977,6 +994,8 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
 
   @Test
   public void testIndexToolForIncrementalVerify_viewIndex() throws Exception {
+    Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+      indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
     ManualEnvironmentEdge customeEdge = new ManualEnvironmentEdge();
     String schemaName = generateUniqueName();
     String dataTableName = generateUniqueName();
@@ -1387,6 +1406,8 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
 
   @Test
   public void testUpdatablePKFilterViewIndexRebuild() throws Exception {
+    Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+      indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
     if (!mutable) {
       return;
     }
@@ -1456,6 +1477,8 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseTest {
 
   @Test
   public void testUpdatableNonPkFilterViewIndexRebuild() throws Exception {
+    Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+      indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
     if (!mutable) {
       return;
     }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java
index bddd8d384a..6b05b7dd0a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.end2end;
 
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 
 import java.sql.Connection;
@@ -47,7 +50,7 @@ import org.junit.runners.Parameterized;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class VarBinaryEncoded2IT extends ParallelStatsDisabledIT {
 
@@ -72,6 +75,9 @@ public class VarBinaryEncoded2IT extends 
ParallelStatsDisabledIT {
     
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
       Integer.toString(60 * 60));
     props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(false));
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000));
+    props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualGenerateIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualGenerateIT.java
new file mode 100644
index 0000000000..a862df07ad
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualGenerateIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class GlobalIndexCheckerEventualGenerateIT extends GlobalIndexCheckerIT 
{
+
+  public GlobalIndexCheckerEventualGenerateIT(boolean async, boolean encoded) {
+    super(async, encoded);
+  }
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(10);
+    
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000));
+    props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
+    props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+
+  @Before
+  public void appendEventualConsistency() {
+    if (indexDDLOptions.trim().isEmpty()) {
+      indexDDLOptions = " CONSISTENCY=EVENTUAL";
+    } else {
+      indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions;
+    }
+  }
+
+  @After
+  public void after() throws Exception {
+  }
+
+  @Override
+  protected void waitForEventualConsistency() throws Exception {
+    Thread.sleep(18000);
+  }
+
+  @Parameterized.Parameters(name = "async={0},encoded={1}")
+  public static synchronized Collection<Object[]> data() {
+    List<Object[]> list = Lists.newArrayListWithExpectedSize(4);
+    boolean[] Booleans = new boolean[] { true, false };
+    for (boolean async : Booleans) {
+      for (boolean encoded : Booleans) {
+        list.add(new Object[] { async, encoded });
+      }
+    }
+    return list;
+  }
+
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java
new file mode 100644
index 0000000000..aee8564ab5
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class GlobalIndexCheckerEventualIT extends GlobalIndexCheckerIT {
+
+  public GlobalIndexCheckerEventualIT(boolean async, boolean encoded) {
+    super(async, encoded);
+  }
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(8);
+    
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000));
+    props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
+    props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+
+  @Before
+  public void appendEventualConsistency() {
+    if (indexDDLOptions.trim().isEmpty()) {
+      indexDDLOptions = " CONSISTENCY=EVENTUAL";
+    } else {
+      indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions;
+    }
+  }
+
+  @After
+  public void after() throws Exception {
+  }
+
+  @Override
+  protected void waitForEventualConsistency() throws Exception {
+    Thread.sleep(15000);
+  }
+
+  @Parameterized.Parameters(name = "async={0},encoded={1}")
+  public static synchronized Collection<Object[]> data() {
+    List<Object[]> list = Lists.newArrayListWithExpectedSize(4);
+    boolean[] Booleans = new boolean[] { true, false };
+    for (boolean async : Booleans) {
+      for (boolean encoded : Booleans) {
+        list.add(new Object[] { async, encoded });
+      }
+    }
+    return list;
+  }
+
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 67ccf665e2..8d7d4c546a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -85,8 +85,8 @@ import 
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 public class GlobalIndexCheckerIT extends BaseTest {
 
   private final boolean async;
-  private String indexDDLOptions;
-  private String tableDDLOptions;
+  protected String indexDDLOptions;
+  protected String tableDDLOptions;
   private StringBuilder optionBuilder;
   private StringBuilder indexOptionBuilder;
   private final boolean encoded;
@@ -266,6 +266,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
         + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
       // Verify that we will read from the index table
       assertExplainPlan(conn, query, dataTableName, indexTableName);
+      waitForEventualConsistency();
       rs = conn.createStatement().executeQuery(query);
       assertTrue(rs.next());
       assertEquals("bc", rs.getString(1));
@@ -300,6 +301,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
         "SELECT  val1, val2, PHOENIX_ROW_TIMESTAMP()  from " + dataTableName + 
" WHERE val1 = 'de'";
       // Verify that we will read from the index table
       assertExplainPlan(conn, query, dataTableName, indexTableName);
+      waitForEventualConsistency();
       rs = conn.createStatement().executeQuery(query);
       assertTrue(rs.next());
       assertEquals("de", rs.getString(1));
@@ -322,6 +324,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " values ('e', 'ae', 'efg', 
'efgh')");
       conn.commit();
+      waitForEventualConsistency();
       // Write a query to get all the rows in the order of their timestamps
       query = "SELECT  val1, val2, PHOENIX_ROW_TIMESTAMP() from " + 
dataTableName + " WHERE "
         + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + initial.toString()
@@ -365,6 +368,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
         + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
 
       assertExplainPlan(conn, query, dataTableName, indexTableName);
+      waitForEventualConsistency();
       rs = conn.createStatement().executeQuery(query);
       assertTrue(rs.next());
       assertEquals("ab", rs.getString(1));
@@ -565,6 +569,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.commit();
       // Now the expected state of the index table is {('ab', 'a', 'abcc' , 
null), ('ab', 'b', null,
       // 'bcde')}
+      waitForEventualConsistency();
       ResultSet rs = conn.createStatement().executeQuery("SELECT * from " + 
indexTableName);
       assertTrue(rs.next());
       assertEquals("ab", rs.getString(1));
@@ -606,6 +611,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       String dml = "DELETE from " + dataTableName + " WHERE id  = 'a'";
       assertEquals(1, conn.createStatement().executeUpdate(dml));
       conn.commit();
+      waitForEventualConsistency();
 
       // The index rows are actually not deleted yet because 
IndexRegionObserver failed delete
       // operation. However, they are
@@ -653,6 +659,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
     String selectSql = "SELECT * from " + dataTableName + " WHERE val1  = 
'ab'";
     // Verify that we will read from the index table
     assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+    waitForEventualConsistency();
     ResultSet rs = conn.createStatement().executeQuery(selectSql);
     assertTrue(rs.next());
     assertEquals("a", rs.getString(1));
@@ -666,6 +673,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
     conn.commit();
     // Verify that we will read from the index table
     assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+    waitForEventualConsistency();
     rs = conn.createStatement().executeQuery(selectSql);
     assertTrue(rs.next());
     assertEquals("a", rs.getString(1));
@@ -795,6 +803,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
         // run the index MR job.
         IndexToolIT.runIndexTool(false, null, dataTableName, indexTableName);
       }
+      waitForEventualConsistency();
       // Configure IndexRegionObserver to fail the last write phase (i.e., the 
post index update
       // phase) where the verify flag is set
       // to true and/or index rows are deleted and check that this does not 
impact the correctness
@@ -805,6 +814,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " (id, val1, val2) values 
('c', 'cd','cde')");
       conn.commit();
+      waitForEventualConsistency();
       IndexTool indexTool = IndexToolIT.runIndexTool(false, "", dataTableName, 
indexTableName, null,
         0, IndexTool.IndexVerifyType.ONLY);
       assertEquals(3, 
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
@@ -824,8 +834,10 @@ public class GlobalIndexCheckerIT extends BaseTest {
         
.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
       assertEquals(0, indexTool.getJob().getCounters()
         
.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
-      assertEquals(2, indexTool.getJob().getCounters()
-        .findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+      if (!indexDDLOptions.contains("CONSISTENCY=EVENTUAL")) {
+        assertEquals(2, indexTool.getJob().getCounters()
+          .findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+      }
       assertEquals(0, indexTool.getJob().getCounters()
         .findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
       assertEquals(0, indexTool.getJob().getCounters()
@@ -872,6 +884,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " (id, val1, val2) values 
('a', 'ab','abc')");
       conn.commit();
+      waitForEventualConsistency();
       // At this moment val3 in the data table row has null value
       String selectSql = "SELECT val3 from " + dataTableName + " WHERE val1  = 
'ab'";
       // Verify that we will read from the index table
@@ -919,6 +932,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " values ('a', 'ab','abc', 
'abcd')");
       conn.commit();
+      waitForEventualConsistency();
       // At this moment val3 in the data table row should not have null value
       selectSql = "SELECT val3 from " + dataTableName + " WHERE val1  = 'ab'";
       // Verify that we will read from the index table
@@ -932,6 +946,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " (id, val1, val3) values 
('a', 'ab','abcde')");
       commitWithException(conn);
+      waitForEventualConsistency();
       // The above upsert will create an unverified index row
       // Configure IndexRegionObserver to allow the data write phase
       IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
@@ -983,6 +998,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE 
val1  = 'cd'";
       // Verify that we will read from the first index table
       assertExplainPlan(conn, selectSql, dataTableName, indexTableName + "1");
+      waitForEventualConsistency();
       // Verify the first write is visible but the second one is not
       ResultSet rs = conn.createStatement().executeQuery(selectSql);
       assertTrue(rs.next());
@@ -1120,6 +1136,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " (id, val3) values ('a', 
'abcdd')");
       conn.commit();
+      waitForEventualConsistency();
       String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE 
val1  = 'ab'";
       // Verify that we will read from the first index table
       assertExplainPlan(conn, selectSql, dataTableName, indexName + "1");
@@ -1326,6 +1343,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " " + "values ('g', 'val1', 
'val2g', null)");
       conn.commit();
+      waitForEventualConsistency();
       // Fail phase 3
       IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
       String selectSql = "SELECT id from " + dataTableName
@@ -1390,6 +1408,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement().execute("upsert into " + dataTableName + " "
         + "values ('a2', 'a1', 'val1', 'val2a', 'val31', 'val4')");
       conn.commit();
+      waitForEventualConsistency();
 
       // create an unverified update to the index row pointing to an existing 
data row
       IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
@@ -1400,6 +1419,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement().execute("upsert into " + dataTableName + " "
         + "values ('a2', 'a2', 'val1', 'val1b', 'val3', 'val4')");
       conn.commit();
+      waitForEventualConsistency();
 
       ArrayList<String> expectedValues = Lists.newArrayList("a1", "a2");
       String selectSql =
@@ -1415,6 +1435,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement().execute("upsert into " + dataTableName + " "
         + "values ('a3', 'a2', 'val1', 'val2a', 'val3', 'val4')");
       conn.commit();
+      waitForEventualConsistency();
       IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
       expectedValues = Lists.newArrayList("a1", "a2", "a3");
       selectSql =
@@ -1431,6 +1452,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement().execute("upsert into " + dataTableName + " "
         + "values ('a5', 'a1', 'val1_4', 'val1_4', 'val1_4', 'val1_4')");
       conn.commit();
+      waitForEventualConsistency();
       IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
       expectedValues = Lists.newArrayList("a4", "a5");
       selectSql =
@@ -1475,6 +1497,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement().execute("upsert into " + dataTableName + " "
         + "values ('a2', 'a1', 'val1a', 'val2a', 'val31', 'val4')");
       conn.commit();
+      waitForEventualConsistency();
 
       ArrayList<String> expectedValues = Lists.newArrayList("a1", "a2");
       // condition on val1 in WHERE clause so that query will use the 
uncovered index
@@ -1499,6 +1522,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement().execute("upsert into " + dataTableName + " "
         + "values ('a4', 'a1', 'val1b', 'val2a', 'val31', 'val4')");
       conn.commit();
+      waitForEventualConsistency();
       expectedValues = Lists.newArrayList("a3", "a4");
       selectSql = "SELECT distinct(id1) from " + dataTableName + " WHERE val1 
= 'val1b'";
       verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql, 
expectedValues);
@@ -1589,6 +1613,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " (id, val3) values ('a', 
null)");
       conn.commit();
+      waitForEventualConsistency();
 
       String dql =
         String.format("select id, val2 from %s where val1='ab' and 
val3='abcd'", dataTableName);
@@ -1613,6 +1638,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " values ('a', 'ac', null, 
null)");
       conn.commit();
+      waitForEventualConsistency();
 
       dql =
         String.format("select id, val2 from %s where val1='ac' and val3 is 
null", dataTableName);
@@ -1651,6 +1677,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " (id, val1, val2) values 
('c', 'cd', 'cde')");
       conn.commit();
+      waitForEventualConsistency();
       IndexRegionObserver.setIgnoreWritingDeleteColumnsToIndex(false);
       IndexTool it = IndexToolIT.runIndexTool(false, null, dataTableName, 
indexName, null, 0,
         IndexTool.IndexVerifyType.BEFORE);
@@ -1689,6 +1716,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       String delete = String.format("DELETE FROM %s where id = 'a'", 
dataTableName);
       conn.createStatement().execute(delete);
       conn.commit();
+      waitForEventualConsistency();
       // skip phase2, inserts an unverified row in index
       IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
       String dml = "upsert into " + dataTableName + " (id, val1, val3) values 
('a', 'ab', ?)";
@@ -1715,6 +1743,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
       conn.createStatement()
         .execute("upsert into " + dataTableName + " (id, val1, val3) values 
('a', 'ab', null)");
       conn.commit();
+      waitForEventualConsistency();
       IndexTool it = IndexToolIT.runIndexTool(false, null, dataTableName, 
indexName, null, 0,
         IndexTool.IndexVerifyType.ONLY);
       CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(it);
@@ -1737,6 +1766,8 @@ public class GlobalIndexCheckerIT extends BaseTest {
       // No need to run the same test twice one for async = true and the other 
for async = false
       return;
     }
+    Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL",
+      indexDDLOptions.contains("CONSISTENCY=EVENTUAL"));
     try (Connection conn = DriverManager.getConnection(getUrl())) {
       // Create a base table
       String dataTableName = generateUniqueName();
@@ -1813,6 +1844,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
         + "val1 = val1 || val1, val2 = val2 || val2";
       conn.createStatement().execute(upsertSql);
       conn.commit();
+      waitForEventualConsistency();
       String selectSql = "SELECT * from " + dataTableName + " WHERE val1 = 
'abab'";
       assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
       ResultSet rs = conn.createStatement().executeQuery(selectSql);
@@ -1849,6 +1881,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
         ps.executeUpdate();
       }
       conn.commit();
+      waitForEventualConsistency();
       String distinctQuery = "SELECT DISTINCT val1 FROM " + dataTableName;
       try (ResultSet rs = conn.createStatement().executeQuery(distinctQuery)) {
         PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
@@ -1876,14 +1909,18 @@ public class GlobalIndexCheckerIT extends BaseTest {
     }
   }
 
-  static private void verifyTableHealth(Connection conn, String dataTableName,
-    String indexTableName) throws Exception {
+  protected void waitForEventualConsistency() throws Exception {
+  }
+
+  protected void verifyTableHealth(Connection conn, String dataTableName, 
String indexTableName)
+    throws Exception {
     // Add two rows and check everything is still okay
     conn.createStatement()
       .execute("upsert into " + dataTableName + " values ('a', 'ab', 'abc', 
'abcd')");
     conn.createStatement()
       .execute("upsert into " + dataTableName + " values ('z', 'za', 'zab', 
'zabc')");
     conn.commit();
+    waitForEventualConsistency();
     String selectSql = "SELECT * from " + dataTableName + " WHERE val1  = 
'ab'";
     /// Verify that we will read from the index table
     assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 7b14996fba..e2e3afedef 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -2070,7 +2070,7 @@ public abstract class BaseTest {
    * @throws IOException if something went wrong while connecting to Admin
    */
   public synchronized static boolean isAnyStoreRefCountLeaked(Admin admin) 
throws IOException {
-    int retries = 5;
+    int retries = 15;
     while (retries > 0) {
       boolean isStoreRefCountLeaked = isStoreRefCountLeaked(admin);
       if (!isStoreRefCountLeaked) {

Reply via email to