This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b506916d1 PHOENIX-7653 New CDC Event for TTL expired rows (#2209)
8b506916d1 is described below

commit 8b506916d1fa942607945a55b73e8987ef6045bd
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Mon Jul 7 17:22:01 2025 -0700

    PHOENIX-7653 New CDC Event for TTL expired rows (#2209)
---
 .../org/apache/phoenix/query/QueryConstants.java   |   3 +
 .../org/apache/phoenix/query/QueryServices.java    |   3 +
 .../apache/phoenix/query/QueryServicesOptions.java |   6 +-
 .../org/apache/phoenix/util/CDCChangeBuilder.java  |   3 +-
 .../main/java/org/apache/phoenix/util/CDCUtil.java |  34 +-
 .../phoenix/coprocessor/CDCCompactionUtil.java     | 395 +++++++++++++++++++++
 .../coprocessor/CDCGlobalIndexRegionScanner.java   | 101 +++++-
 .../phoenix/coprocessor/CompactionScanner.java     |  33 +-
 .../java/org/apache/phoenix/end2end/Bson3IT.java   | 291 ++++++++++++++-
 .../org/apache/phoenix/end2end/TableTTLIT.java     | 255 +++++++++++++
 .../phoenix/schema/ConditionalTTLExpressionIT.java | 193 +++++++++-
 11 files changed, 1298 insertions(+), 19 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 9911f5f0a3..57da865493 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -363,6 +363,9 @@ public interface QueryConstants {
     String CDC_UPSERT_EVENT_TYPE = "upsert";
     String CDC_DELETE_EVENT_TYPE = "delete";
     String SPLITS_FILE = "SPLITS_FILE";
+    String CDC_TTL_DELETE_EVENT_TYPE = "ttl_delete";
+    String CDC_IMAGE_CQ = "_CDC_IMG_";
+    byte[] CDC_IMAGE_CQ_BYTES = Bytes.toBytes(CDC_IMAGE_CQ);
 
     /**
      * We mark counter values 0 to 10 as reserved. Value 0 is used by
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index e16e716958..e9d0e8d86e 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -440,6 +440,9 @@ public interface QueryServices extends SQLCloseable {
     String CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES =
             "phoenix.conn.query.service.histogram.size.ranges";
 
+    // CDC TTL mutation retry configuration
+    String CDC_TTL_MUTATION_MAX_RETRIES = 
"phoenix.cdc.ttl.mutation.max.retries";
+
     // This config is used to move (copy and delete) the child links from the 
SYSTEM.CATALOG to SYSTEM.CHILD_LINK table.
     // As opposed to a copy and async (out of band) delete.
     public static final String MOVE_CHILD_LINKS_DURING_UPGRADE_ENABLED = 
"phoenix.move.child_link.during.upgrade";
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index bb5ef67e49..b0ab07d901 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -24,6 +24,7 @@ import static 
org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE
 import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.CDC_TTL_MUTATION_MAX_RETRIES;
 import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG;
 import static 
org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
@@ -469,7 +470,7 @@ public class QueryServicesOptions {
     public static final int DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE = 512;
     public static final Boolean 
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true;
     public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED = 
false;
-
+    public static final int DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES = 5;
 
     private final Configuration config;
 
@@ -585,7 +586,8 @@ public class QueryServicesOptions {
             .setIfUnset(CQSI_THREAD_POOL_MAX_QUEUE, 
DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE)
             .setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
                     DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT)
-            .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED, 
DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED);
+            .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED, 
DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED)
+            .setIfUnset(CDC_TTL_MUTATION_MAX_RETRIES, 
DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES);
 
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user 
set
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
index 4bd2567ddf..101d0f9335 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
@@ -31,6 +31,7 @@ import static 
org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE;
 import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
 import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
 import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static 
org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE;
 import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
 
 public class CDCChangeBuilder {
@@ -69,7 +70,7 @@ public class CDCChangeBuilder {
     }
 
     public boolean isDeletionEvent() {
-        return changeType == CDC_DELETE_EVENT_TYPE;
+        return changeType == CDC_DELETE_EVENT_TYPE || changeType == 
CDC_TTL_DELETE_EVENT_TYPE;
     }
 
     public boolean isNonEmptyEvent() {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 4cdd48cf97..44c09055f2 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PDataType;
 import org.bson.RawBsonDocument;
@@ -112,6 +113,37 @@ public class CDCUtil {
         return isCDCIndex(indexTable.getTableName().getString());
     }
 
+    public static boolean isCDCIndexActive(PTable indexTable) {
+        return isCDCIndex(indexTable.getTableName().getString())
+                && indexTable.getIndexState() == PIndexState.ACTIVE;
+    }
+
+    /**
+     * Check if the given table has an active CDC index.
+     *
+     * @param table The PTable object.
+     * @return true if the table has an active CDC index, false otherwise.
+     */
+    public static boolean hasActiveCDCIndex(PTable table) {
+        if (table == null || table.getIndexes() == null) {
+            return false;
+        }
+        return table.getIndexes().stream().anyMatch(CDCUtil::isCDCIndexActive);
+    }
+
+    /**
+     * Return PTable of the active CDC index for the given data table.
+     *
+     * @param dataTable The data table.
+     * @return active CDC index.
+     */
+    public static PTable getActiveCDCIndex(PTable dataTable) {
+        return dataTable.getIndexes().stream()
+                .filter(CDCUtil::isCDCIndexActive)
+                .findFirst()
+                .orElse(null);
+    }
+
     /**
      * Check if the given table has any CDC indexes.
      *
@@ -152,7 +184,7 @@ public class CDCUtil {
     public static Object getColumnEncodedValue(Object value, PDataType 
dataType) {
         if (value != null) {
             if (dataType.getSqlType() == PDataType.BSON_TYPE) {
-                value = Bytes.toBytes(((RawBsonDocument) 
value).getByteBuffer().asNIO());
+                value = ByteUtil.toBytes(((RawBsonDocument) 
value).getByteBuffer().asNIO());
             } else if (isBinaryType(dataType)) {
                 // Unfortunately, Base64.Encoder has no option to specify 
offset and length so can't
                 // avoid copying bytes.
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
new file mode 100644
index 0000000000..78fd936c29
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR;
+
+/**
+ * Utility class for CDC (Change Data Capture) operations during compaction.
+ * This class contains utilities for handling TTL row expiration events and 
generating
+ * CDC events with pre-image data that are written directly to CDC index 
tables.
+ */
+public final class CDCCompactionUtil {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCCompactionUtil.class);
+
+    private CDCCompactionUtil() {
+        // empty
+    }
+
+    /**
+     * Finds the column name for a given cell in the data table.
+     *
+     * @param dataTable The data table
+     * @param cell      The cell
+     * @return The column name or null if not found
+     */
+    private static String findColumnName(PTable dataTable, Cell cell) {
+        try {
+            byte[] family = CellUtil.cloneFamily(cell);
+            byte[] qualifier = CellUtil.cloneQualifier(cell);
+            byte[] defaultCf = dataTable.getDefaultFamilyName() != null
+                    ? dataTable.getDefaultFamilyName().getBytes()
+                    : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+            for (PColumn column : dataTable.getColumns()) {
+                if (column.getFamilyName() != null
+                        && Bytes.equals(family, 
column.getFamilyName().getBytes())
+                        && Bytes.equals(qualifier, 
column.getColumnQualifierBytes())) {
+                    if (Bytes.equals(defaultCf, 
column.getFamilyName().getBytes())) {
+                        return column.getName().getString();
+                    } else {
+                        return column.getFamilyName().getString() + 
NAME_SEPARATOR
+                                + column.getName().getString();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("Error finding column name for cell: {}", 
CellUtil.toString(cell, true),
+                    e);
+        }
+        return null;
+    }
+
+    /**
+     * Creates a CDC event map for TTL delete with pre-image data.
+     *
+     * @param expiredRowPut The expired row data
+     * @param dataTable     The data table
+     * @param preImage      Pre-image map
+     * @return CDC event map
+     */
+    private static Map<String, Object> createTTLDeleteCDCEvent(Put 
expiredRowPut, PTable dataTable,
+                                                               Map<String, 
Object> preImage)
+            throws Exception {
+        Map<String, Object> cdcEvent = new HashMap<>();
+        cdcEvent.put(QueryConstants.CDC_EVENT_TYPE, 
QueryConstants.CDC_TTL_DELETE_EVENT_TYPE);
+        for (List<Cell> familyCells : 
expiredRowPut.getFamilyCellMap().values()) {
+            for (Cell cell : familyCells) {
+                String columnName = findColumnName(dataTable, cell);
+                if (columnName != null) {
+                    PColumn column = dataTable.getColumnForColumnQualifier(
+                            CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell));
+                    Object value = 
column.getDataType().toObject(cell.getValueArray(),
+                            cell.getValueOffset(),
+                            cell.getValueLength());
+                    Object encodedValue =
+                            CDCUtil.getColumnEncodedValue(value, 
column.getDataType());
+                    preImage.put(columnName, encodedValue);
+                }
+            }
+        }
+        cdcEvent.put(QueryConstants.CDC_PRE_IMAGE, preImage);
+        cdcEvent.put(QueryConstants.CDC_POST_IMAGE, Collections.emptyMap());
+        return cdcEvent;
+    }
+
+    /**
+     * Builds CDC index Put mutation.
+     *
+     * @param cdcIndex            The CDC index table
+     * @param expiredRowPut       The expired row data as a Put
+     * @param eventTimestamp      The timestamp for the CDC event
+     * @param cdcEventBytes       The CDC event data to store
+     * @param dataTable           The data table
+     * @param env                 The region coprocessor environment
+     * @param region              The HBase region
+     * @param compactionTimeBytes The compaction time as bytes
+     * @return The CDC index Put mutation
+     */
+    private static Put buildCDCIndexPut(PTable cdcIndex, Put expiredRowPut, 
long eventTimestamp,
+                                        byte[] cdcEventBytes, PTable dataTable,
+                                        RegionCoprocessorEnvironment env, 
Region region,
+                                        byte[] compactionTimeBytes) throws 
Exception {
+
+        try (PhoenixConnection serverConnection = 
QueryUtil.getConnectionOnServer(new Properties(),
+                env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+
+            IndexMaintainer cdcIndexMaintainer =
+                    cdcIndex.getIndexMaintainer(dataTable, serverConnection);
+
+            ValueGetter dataRowVG = new 
IndexUtil.SimpleValueGetter(expiredRowPut);
+            ImmutableBytesPtr rowKeyPtr = new 
ImmutableBytesPtr(expiredRowPut.getRow());
+
+            Put cdcIndexPut = cdcIndexMaintainer.buildUpdateMutation(
+                    GenericKeyValueBuilder.INSTANCE,
+                    dataRowVG,
+                    rowKeyPtr,
+                    eventTimestamp,
+                    null,
+                    null,
+                    false,
+                    region.getRegionInfo().getEncodedNameAsBytes());
+
+            byte[] rowKey = cdcIndexPut.getRow().clone();
+            System.arraycopy(compactionTimeBytes, 0, rowKey,
+                    PartitionIdFunction.PARTITION_ID_LENGTH, 
PDate.INSTANCE.getByteSize());
+            Put newCdcIndexPut = new Put(rowKey, eventTimestamp);
+
+            newCdcIndexPut.addColumn(
+                    
cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                    cdcIndexMaintainer.getEmptyKeyValueQualifier(), 
eventTimestamp,
+                    QueryConstants.UNVERIFIED_BYTES);
+
+            // Add CDC event data
+            newCdcIndexPut.addColumn(
+                    QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                    QueryConstants.CDC_IMAGE_CQ_BYTES,
+                    eventTimestamp,
+                    cdcEventBytes);
+
+            return newCdcIndexPut;
+        }
+    }
+
+    /**
+     * Generates and applies a CDC index mutation for TTL expired row with 
retries if required.
+     *
+     * @param cdcIndex                 The CDC index table
+     * @param dataTable                The data table
+     * @param expiredRowPut            The expired row data as a Put
+     * @param eventTimestamp           The timestamp for the CDC event
+     * @param tableName                The table name for logging
+     * @param env                      The region coprocessor environment
+     * @param region                   The HBase region
+     * @param compactionTimeBytes      The compaction time as bytes
+     * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations
+     */
+    private static void generateCDCIndexMutation(PTable cdcIndex, PTable 
dataTable,
+                                                 Put expiredRowPut,
+                                                 long eventTimestamp, String 
tableName,
+                                                 RegionCoprocessorEnvironment 
env, Region region,
+                                                 byte[] compactionTimeBytes,
+                                                 int cdcTtlMutationMaxRetries)
+            throws Exception {
+        Map<String, Object> cdcEvent =
+                createTTLDeleteCDCEvent(expiredRowPut, dataTable, new 
HashMap<>());
+        byte[] cdcEventBytes =
+                
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent);
+        Put cdcIndexPut =
+                buildCDCIndexPut(cdcIndex, expiredRowPut, eventTimestamp, 
cdcEventBytes,
+                        dataTable, env, region, compactionTimeBytes);
+
+        Exception lastException = null;
+        for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries; 
retryCount++) {
+            try (Table cdcIndexTable = 
env.getConnection().getTable(TableName.valueOf(
+                    cdcIndex.getPhysicalName().getBytes()))) {
+                CheckAndMutate checkAndMutate =
+                        CheckAndMutate.newBuilder(cdcIndexPut.getRow())
+                                
.ifNotExists(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                                        QueryConstants.CDC_IMAGE_CQ_BYTES)
+                                .build(cdcIndexPut);
+                CheckAndMutateResult result = 
cdcIndexTable.checkAndMutate(checkAndMutate);
+
+                if (result.isSuccess()) {
+                    // Successfully inserted new CDC event - Single CF case
+                    lastException = null;
+                    break;
+                } else {
+                    // Row already exists, need to retrieve existing pre-image 
and merge
+                    // Likely to happen for multi CF case
+                    Get get = new Get(cdcIndexPut.getRow());
+                    get.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                            QueryConstants.CDC_IMAGE_CQ_BYTES);
+                    Result existingResult = cdcIndexTable.get(get);
+
+                    if (!existingResult.isEmpty()) {
+                        Cell existingCell = existingResult.getColumnLatestCell(
+                                QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                                QueryConstants.CDC_IMAGE_CQ_BYTES);
+
+                        if (existingCell != null) {
+                            byte[] existingCdcBytes = 
CellUtil.cloneValue(existingCell);
+                            Map<String, Object> existingCdcEvent =
+                                    JacksonUtil.getObjectReader(HashMap.class)
+                                            .readValue(existingCdcBytes);
+                            Map<String, Object> existingPreImage =
+                                    (Map<String, Object>) 
existingCdcEvent.getOrDefault(
+                                            QueryConstants.CDC_PRE_IMAGE, new 
HashMap<>());
+
+                            // Create new TTL delete event with merged 
pre-image
+                            Map<String, Object> mergedCdcEvent =
+                                    createTTLDeleteCDCEvent(expiredRowPut, 
dataTable,
+                                            existingPreImage);
+                            byte[] mergedCdcEventBytes =
+                                    JacksonUtil.getObjectWriter(HashMap.class)
+                                            .writeValueAsBytes(mergedCdcEvent);
+
+                            Put mergedCdcIndexPut = buildCDCIndexPut(cdcIndex, 
expiredRowPut,
+                                    eventTimestamp, mergedCdcEventBytes, 
dataTable, env, region,
+                                    compactionTimeBytes);
+
+                            cdcIndexTable.put(mergedCdcIndexPut);
+                            lastException = null;
+                            break;
+                        } else {
+                            LOGGER.warn("Rare event: Skipping CDC TTL mutation 
because other type"
+                                    + " of CDC event is recorded at time {}", 
eventTimestamp);
+                            break;
+                        }
+                    } else {
+                        LOGGER.warn("Rare event.. Skipping CDC TTL mutation 
because other type"
+                                + " of CDC event is recorded at time {}", 
eventTimestamp);
+                        break;
+                    }
+                }
+            } catch (Exception e) {
+                lastException = e;
+                long backoffMs = 100;
+                LOGGER.warn("CDC mutation attempt {}/{} failed, retrying in 
{}ms",
+                        retryCount + 1, cdcTtlMutationMaxRetries + 1, 
backoffMs, e);
+                try {
+                    Thread.sleep(backoffMs);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException("Interrupted during CDC mutation 
retry", ie);
+                }
+            }
+        }
+        if (lastException != null) {
+            LOGGER.error("Failed to generate CDC mutation after {} attempts 
for table {}, index "
+                            + "{}. The event update is missed.",
+                    cdcTtlMutationMaxRetries + 1,
+                    tableName,
+                    cdcIndex.getPhysicalName().getString(),
+                    lastException);
+        }
+    }
+
+    /**
+     * Generates CDC TTL delete event and writes it directly to CDC index 
tables.
+     * This bypasses the normal CDC update path since the row is being expired.
+     *
+     * @param expiredRow               The cells of the expired row
+     * @param tableName                The table name for logging
+     * @param compactionTime           The compaction timestamp
+     * @param dataTable                The data table
+     * @param env                      The region coprocessor environment
+     * @param region                   The HBase region
+     * @param compactionTimeBytes      The compaction time as bytes
+     * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations
+     */
+    private static void generateCDCTTLDeleteEvent(List<Cell> expiredRow, 
String tableName,
+                                                  long compactionTime, PTable 
dataTable,
+                                                  RegionCoprocessorEnvironment 
env, Region region,
+                                                  byte[] compactionTimeBytes,
+                                                  int 
cdcTtlMutationMaxRetries) {
+        try {
+            PTable cdcIndex = CDCUtil.getActiveCDCIndex(dataTable);
+            if (cdcIndex == null) {
+                LOGGER.warn("No active CDC index found for table {}", 
tableName);
+                return;
+            }
+            Cell firstCell = expiredRow.get(0);
+            byte[] dataRowKey = CellUtil.cloneRow(firstCell);
+            Put expiredRowPut = new Put(dataRowKey);
+
+            for (Cell cell : expiredRow) {
+                expiredRowPut.add(cell);
+            }
+
+            try {
+                generateCDCIndexMutation(cdcIndex, dataTable, expiredRowPut, 
compactionTime,
+                        tableName, env, region, compactionTimeBytes, 
cdcTtlMutationMaxRetries);
+            } catch (Exception e) {
+                LOGGER.error("Failed to generate CDC mutation for index {}: 
{}",
+                        cdcIndex.getName().getString(), e.getMessage(), e);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Error generating CDC TTL delete event for table {}",
+                    tableName, e);
+        }
+    }
+
+    /**
+     * Handles TTL row expiration for CDC event generation.
+     * This method is called when a row is detected as expired during major 
compaction.
+     *
+     * @param expiredRow               The cells of the expired row
+     * @param expirationType           The type of TTL expiration
+     * @param tableName                The table name for logging purposes
+     * @param compactionTime           The timestamp when compaction started
+     * @param table                    The Phoenix data table metadata
+     * @param env                      The region coprocessor environment for 
accessing HBase
+     *                                 resources
+     * @param region                   The HBase region being compacted
+     * @param compactionTimeBytes      The compaction timestamp as byte array 
for CDC index row key
+     *                                 construction
+     * @param cdcTtlMutationMaxRetries Maximum number of retry attempts for 
CDC mutation operations
+     */
+    static void handleTTLRowExpiration(List<Cell> expiredRow, String 
expirationType,
+                                       String tableName, long compactionTime, 
PTable table,
+                                       RegionCoprocessorEnvironment env, 
Region region,
+                                       byte[] compactionTimeBytes,
+                                       int cdcTtlMutationMaxRetries) {
+        try {
+            Cell firstCell = expiredRow.get(0);
+            byte[] rowKey = CellUtil.cloneRow(firstCell);
+
+            LOGGER.info("TTL row expiration detected: table={}, rowKey={}, 
expirationType={}, "
+                            + "cellCount={}, compactionTime={}",
+                    tableName,
+                    Bytes.toStringBinary(rowKey),
+                    expirationType,
+                    expiredRow.size(),
+                    compactionTime);
+
+            // Generate CDC TTL delete event with pre-image data
+            generateCDCTTLDeleteEvent(expiredRow, tableName, compactionTime, 
table, env, region,
+                    compactionTimeBytes, cdcTtlMutationMaxRetries);
+
+        } catch (Exception e) {
+            LOGGER.error("Error handling TTL row expiration for CDC: table 
{}", tableName, e);
+        }
+    }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 4cbb4d6147..6cede61c45 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -22,9 +22,11 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilder;
 import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -35,10 +37,9 @@ import 
org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.CDCTableInfo;
 import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.util.CDCChangeBuilder;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -50,12 +51,43 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
 import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.CDC_DATA_TABLE_DEF;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
+/**
+ * CDC (Change Data Capture) enabled region scanner for global indexes that 
processes
+ * uncovered CDC index queries by reconstructing CDC events from index and 
data table rows.
+ *
+ * <h3>Purpose</h3>
+ * This scanner extends {@link UncoveredGlobalIndexRegionScanner} to handle 
CDC index queries
+ * where the CDC index doesn't contain all the columns needed to satisfy the 
query. It bridges
+ * the gap between CDC index rows and the original data table to reconstruct 
complete CDC events.
+ *
+ * <h3>CDC Event Processing</h3>
+ * The scanner processes two types of CDC events:
+ * <ul>
+ *   <li><b>Regular CDC Events:</b> Requires data table scan to build CDC 
event JSON from
+ *       current/historical row state</li>
+ *   <li><b>Pre-Image CDC Events:</b> Contains embedded CDC data (e.g., TTL 
delete events)
+ *       that can be returned directly without data table scan</li>
+ * </ul>
+ *
+ * <h3>CDC Event Structure</h3>
+ * The scanner produces CDC events in JSON format containing:
+ * <ul>
+ *   <li><b>event_type:</b> "upsert", "delete", or "ttl_delete"</li>
+ *   <li><b>pre_image:</b> Row state before the change (for 
updates/deletes)</li>
+ *   <li><b>post_image:</b> Row state after the change (for 
inserts/updates)</li>
+ * </ul>
+ *
+ * @see UncoveredGlobalIndexRegionScanner
+ * @see CDCChangeBuilder
+ * @see CDCTableInfo
+ */
 public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScanner {
     private static final Logger LOGGER =
             LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class);
@@ -101,6 +133,12 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
             List<Cell> indexRow = indexRowIterator.next();
             Cell indexCell = indexRow.get(0);
             byte[] indexRowKey = 
ImmutableBytesPtr.cloneCellRowIfNecessary(indexCell);
+            if (indexRow.size() > 1) {
+                boolean success = handlePreImageCDCEvent(indexRow, 
indexRowKey, indexCell, result);
+                if (success) {
+                    return true;
+                }
+            }
             ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
                     indexToDataRowKeyMap.get(indexRowKey));
             Result dataRow = dataRows.get(dataRowKey);
@@ -233,16 +271,29 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
     private Result getCDCImage(byte[] indexRowKey, Cell firstCell) throws 
JsonProcessingException {
         byte[] value = 
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(
                 changeBuilder.buildCDCEvent());
+        return createCDCResult(indexRowKey, firstCell, 
changeBuilder.getChangeTimestamp(), value);
+    }
+
+    /**
+     * Generates the Result object for the CDC event.
+     *
+     * @param indexRowKey The CDC index row key
+     * @param firstCell   The first cell
+     * @param timestamp   The timestamp for the CDC event
+     * @param value       The CDC event JSON bytes
+     * @return Result containing the CDC data
+     */
+    private Result createCDCResult(byte[] indexRowKey, Cell firstCell, long 
timestamp,
+                                   byte[] value) {
         CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
-        Result cdcRow = Result.create(Arrays.asList(builder
+        return Result.create(Collections.singletonList(builder
                 .setRow(indexRowKey)
                 
.setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(firstCell))
                 .setQualifier(cdcDataTableInfo.getCdcJsonColQualBytes())
-                .setTimestamp(changeBuilder.getChangeTimestamp())
+                .setTimestamp(timestamp)
                 .setValue(value)
                 .setType(Cell.Type.Put)
                 .build()));
-        return cdcRow;
     }
 
     private Object getColumnValue(Cell cell, PDataType dataType) {
@@ -259,4 +310,44 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
         }
         return CDCUtil.getColumnEncodedValue(value, dataType);
     }
+
+    /**
+     * Handles CDC events that already contain pre-image data, avoiding data 
table scan.
+     * Supports both the new CDC_IMAGE_CQ column and traditional CDC JSON 
column.
+     *
+     * @param indexRow    The CDC index row cells
+     * @param indexRowKey The CDC index row key
+     * @param indexCell   The primary index cell
+     * @param result      The result list to populate
+     * @return true if event was processed successfully
+     */
+    private boolean handlePreImageCDCEvent(List<Cell> indexRow, byte[] 
indexRowKey,
+                                           Cell indexCell, List<Cell> result) {
+        Cell cdcDataCell = null;
+        for (Cell cell : indexRow) {
+            if (Bytes.equals(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                    cell.getQualifierLength(),
+                    QueryConstants.CDC_IMAGE_CQ_BYTES, 0,
+                    QueryConstants.CDC_IMAGE_CQ_BYTES.length)) {
+                cdcDataCell = cell;
+                break;
+            }
+        }
+        if (cdcDataCell == null) {
+            return false;
+        }
+        byte[] cdcEventBytes = CellUtil.cloneValue(cdcDataCell);
+        Result cdcRow = createCDCResult(indexRowKey, indexCell, 
cdcDataCell.getTimestamp(),
+                cdcEventBytes);
+
+        if (tupleProjector != null) {
+            result.add(indexCell);
+            IndexUtil.addTupleAsOneCell(result, new ResultTuple(cdcRow), 
tupleProjector, ptr);
+        } else {
+            result.clear();
+        }
+        LOGGER.debug("Processed CDC event with embedded data, skipped data 
table scan for"
+                + " row key: {}", Bytes.toStringBinary(indexRowKey));
+        return true;
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 2b9215b740..f3577611d0 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -82,6 +83,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TTLExpression;
 import org.apache.phoenix.schema.TTLExpressionFactory;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDate;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PSmallint;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -150,6 +152,7 @@ public class CompactionScanner implements InternalScanner {
     private final int familyCount;
     private KeepDeletedCells keepDeletedCells;
     private long compactionTime;
+    private byte[] compactionTimeBytes;
     private final byte[] emptyCF;
     private final byte[] emptyCQ;
     private final byte[] storeColumnFamily;
@@ -163,6 +166,9 @@ public class CompactionScanner implements InternalScanner {
     private long outputCellCount = 0;
     private boolean phoenixLevelOnly = false;
     private boolean isCDCIndex;
+    private final boolean isCdcTtlEnabled;
+    private final PTable table;
+    private final int cdcTtlMutationMaxRetries;
 
     // Only for forcing minor compaction while testing
     private static boolean forceMinorCompaction = false;
@@ -184,6 +190,7 @@ public class CompactionScanner implements InternalScanner {
         this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
         this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table);
         compactionTime = EnvironmentEdgeManager.currentTimeMillis();
+        compactionTimeBytes = PDate.INSTANCE.toBytes(new Date(compactionTime));
         columnFamilyName = store.getColumnFamilyName();
         storeColumnFamily = columnFamilyName.getBytes();
         tableName = region.getRegionInfo().getTable().getNameAsString();
@@ -205,7 +212,15 @@ public class CompactionScanner implements InternalScanner {
         emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
                         || localIndex;
 
-        isCDCIndex = table != null ? CDCUtil.isCDCIndex(table) : false;
+        this.table = table;
+        isCDCIndex = CDCUtil.isCDCIndex(table);
+        isCdcTtlEnabled =
+                CDCUtil.hasActiveCDCIndex(table) && major && 
!table.isMultiTenant()
+                        && table.getType() == PTableType.TABLE;
+        cdcTtlMutationMaxRetries = env.getConfiguration().getInt(
+                QueryServices.CDC_TTL_MUTATION_MAX_RETRIES,
+                QueryServicesOptions.DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES);
+
         // Initialize the tracker that computes the TTL for the compacting 
table.
         // The TTL tracker can be
         // simple (one single TTL for the table) when the compacting table is 
not Partitioned
@@ -412,6 +427,11 @@ public class CompactionScanner implements InternalScanner {
         CompiledConditionalTTLExpression ttlExpr =
                 (CompiledConditionalTTLExpression) rowContext.ttlExprForRow;
         if (ttlExpr.isExpired(result, true)) {
+            if (isCdcTtlEnabled && !result.isEmpty()) {
+                CDCCompactionUtil.handleTTLRowExpiration(result, 
"conditional_ttl", tableName,
+                        compactionTime, table, env, region, 
compactionTimeBytes,
+                        cdcTtlMutationMaxRetries);
+            }
             // If the row is expired, purge the row
             result.clear();
         }
@@ -2591,6 +2611,12 @@ public class CompactionScanner implements 
InternalScanner {
             if (major && compactionTime - rowContext.maxTimestamp > 
maxLookbackInMillis + ttl) {
                 // Only do this check for major compaction as for minor 
compactions we don't expire cells.
                 // The row version should not be visible via the max lookback 
window. Nothing to do
+
+                if (isCdcTtlEnabled && !lastRow.isEmpty()) {
+                    CDCCompactionUtil.handleTTLRowExpiration(lastRow, 
"time_based_ttl", tableName,
+                            compactionTime, table, env, region, 
compactionTimeBytes,
+                            cdcTtlMutationMaxRetries);
+                }
                 return;
             }
             retainedCells.addAll(lastRow);
@@ -2683,6 +2709,11 @@ public class CompactionScanner implements 
InternalScanner {
                     // store is not the empty column family store.
                     return false;
                 }
+                if (isCdcTtlEnabled && !lastRowVersion.isEmpty()) {
+                    CDCCompactionUtil.handleTTLRowExpiration(lastRowVersion, 
"max_lookback_ttl",
+                            tableName, compactionTime, table, env, region, 
compactionTimeBytes,
+                            cdcTtlMutationMaxRetries);
+                }
                 return true;
             }
             // If the time gap between two back to back mutations is more than 
ttl then we know
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
index 1adad43dc3..8b55334c50 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
@@ -21,12 +21,17 @@ package org.apache.phoenix.end2end;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.bson.BsonArray;
 import org.bson.BsonBinary;
 import org.bson.BsonBoolean;
@@ -39,6 +44,8 @@ import org.bson.RawBsonDocument;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
@@ -52,12 +59,19 @@ import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.Base64;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static 
org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -65,8 +79,25 @@ import static org.junit.Assert.assertTrue;
  * Tests for BSON.
  */
 @Category(ParallelStatsDisabledTest.class)
+@RunWith(Parameterized.class)
 public class Bson3IT extends ParallelStatsDisabledIT {
 
+    private final boolean columnEncoded;
+
+    public Bson3IT(boolean columnEncoded) {
+        this.columnEncoded = columnEncoded;
+    }
+
+    @Parameterized.Parameters(name =
+            "Bson3IT_columnEncoded={0}")
+    public static synchronized Collection<Object[]> data() {
+        return Arrays.asList(
+                new Object[][]{
+                        {false},
+                        {true}
+                });
+    }
+
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   private static String getJsonString(String jsonFilePath) throws IOException {
@@ -86,7 +117,8 @@ public class Bson3IT extends ParallelStatsDisabledIT {
     try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
       String ddl = "CREATE TABLE " + tableName
           + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
-          + " CONSTRAINT pk PRIMARY KEY(PK1))";
+          + " CONSTRAINT pk PRIMARY KEY(PK1)) "
+          + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0");
       String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
       conn.createStatement().execute(ddl);
       conn.createStatement().execute(cdcDdl);
@@ -627,7 +659,8 @@ public class Bson3IT extends ParallelStatsDisabledIT {
     try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
       String ddl = "CREATE TABLE " + tableName
               + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
-              + " CONSTRAINT pk PRIMARY KEY(PK1))";
+              + " CONSTRAINT pk PRIMARY KEY(PK1)) "
+              + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0");
       String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
       conn.createStatement().execute(ddl);
       conn.createStatement().execute(cdcDdl);
@@ -1073,7 +1106,8 @@ public class Bson3IT extends ParallelStatsDisabledIT {
     try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
       String ddl = "CREATE TABLE " + tableName
               + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
-              + " CONSTRAINT pk PRIMARY KEY(PK1))";
+              + " CONSTRAINT pk PRIMARY KEY(PK1)) "
+              + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0");
       String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
       conn.createStatement().execute(ddl);
       conn.createStatement().execute(cdcDdl);
@@ -1443,7 +1477,8 @@ public class Bson3IT extends ParallelStatsDisabledIT {
     try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
       String ddl = "CREATE TABLE " + tableName
               + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
-              + " CONSTRAINT pk PRIMARY KEY(PK1))";
+              + " CONSTRAINT pk PRIMARY KEY(PK1)) "
+              + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0");
       String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
       conn.createStatement().execute(ddl);
       conn.createStatement().execute(cdcDdl);
@@ -1865,12 +1900,14 @@ public class Bson3IT extends ParallelStatsDisabledIT {
   @Test
   public void testCDCWithCaseSenstitiveTableAndPks() throws Exception {
     Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-    String tableName = "XYZ.\"test.table\"";
-    String cdcName = "XYZ.\"CDC_test.table\"";
-    String cdcNameWithoutSchema = "\"CDC_test.table\"";
+    String nameQuotes = "test.tableTESt-_123" + generateUniqueName();
+    String tableName = "XYZ.\"" + nameQuotes + "\"";
+    String cdcName = "XYZ.\"CDC_" + nameQuotes + "\"";
+    String cdcNameWithoutSchema = "\"CDC_" + nameQuotes + "\"";
     try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
       String ddl = "CREATE TABLE " + tableName +
-              " (\"hk\" VARCHAR NOT NULL, COL BSON CONSTRAINT pk PRIMARY 
KEY(\"hk\"))";
+              " (\"hk\" VARCHAR NOT NULL, COL BSON CONSTRAINT pk PRIMARY 
KEY(\"hk\")) "
+              + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0");
       conn.createStatement().execute(ddl);
 
       String cdcDdl = "CREATE CDC " + cdcNameWithoutSchema + " ON " + 
tableName;
@@ -1930,6 +1967,244 @@ public class Bson3IT extends ParallelStatsDisabledIT {
               actualDoc);
 
       assertFalse("Should only have one CDC record", rs.next());
+
+      conn.createStatement().execute("DROP TABLE " + tableName + " CASCADE");
+    }
+  }
+
+  /**
+   * Test BSON operations with SQL conditions and TTL functionality.
+   */
+  @Test
+  public void testBsonOpsWithSqlConditionsUpdateSuccessWithTTL() throws 
Exception {
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    String tableName = generateUniqueName();
+    String cdcName = generateUniqueName();
+    final int ttlSeconds = 10;
+    final int maxLookbackAge = 5;
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      String ddl = "CREATE TABLE " + tableName
+              + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
+              + " CONSTRAINT pk PRIMARY KEY(PK1)) TTL="
+              + ttlSeconds + ", \"phoenix.max.lookback.age.seconds\" = " + 
maxLookbackAge
+              + (this.columnEncoded ? "" : ", COLUMN_ENCODED_BYTES=0");
+      String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
+      conn.createStatement().execute(ddl);
+      conn.createStatement().execute(cdcDdl);
+
+      ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+      long startTime = System.currentTimeMillis() + 1000;
+      startTime = (startTime / 1000) * 1000;
+      EnvironmentEdgeManager.injectEdge(injectEdge);
+      injectEdge.setValue(startTime);
+
+      String sample1 = getJsonString("json/sample_01.json");
+      String sample2 = getJsonString("json/sample_02.json");
+      String sample3 = getJsonString("json/sample_03.json");
+      BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+      BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+      BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3);
+
+      PreparedStatement stmt =
+              conn.prepareStatement("UPSERT INTO " + tableName + " VALUES 
(?,?,?)");
+      stmt.setString(1, "pk0001");
+      stmt.setString(2, "0002");
+      stmt.setObject(3, bsonDocument1);
+      stmt.executeUpdate();
+
+      stmt.setString(1, "pk1010");
+      stmt.setString(2, "1010");
+      stmt.setObject(3, bsonDocument2);
+      stmt.executeUpdate();
+
+      stmt.setString(1, "pk1011");
+      stmt.setString(2, "1011");
+      stmt.setObject(3, bsonDocument3);
+      stmt.executeUpdate();
+
+      conn.commit();
+      injectEdge.incrementValue(1000);
+
+      String conditionExpression =
+              "press = $press AND track[0].shot[2][0].city.standard[5] = 
$softly";
+
+      BsonDocument conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument()
+              .append("$press", new BsonString("beat"))
+              .append("$softly", new BsonString("softly")));
+
+      BsonDocument updateExp = new BsonDocument()
+              .append("$SET", new BsonDocument()
+                      .append("browserling",
+                              new 
BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
+                      .append("track[0].shot[2][0].city.standard[5]", new 
BsonString("soft"))
+                      .append("track[0].shot[2][0].city.problem[2]",
+                              new 
BsonString("track[0].shot[2][0].city.problem[2] + 529.435")))
+              .append("$UNSET", new BsonDocument()
+                      .append("track[0].shot[2][0].city.flame", new 
BsonNull()));
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END,"
+              + " C1 = ?");
+      stmt.setString(1, "pk0001");
+      stmt.setString(2, "0003");
+      stmt.executeUpdate();
+
+      String query = "SELECT * FROM " + tableName + " WHERE PK1 = 'pk0001'";
+      ResultSet rs = conn.createStatement().executeQuery(query);
+      assertTrue(rs.next());
+      BsonDocument document1 = (BsonDocument) rs.getObject(3);
+
+      updateExp = new BsonDocument()
+              .append("$ADD", new BsonDocument()
+                      .append("new_samples",
+                              new BsonDocument().append("$set",
+                                      new BsonArray(Arrays.asList(
+                                              new 
BsonBinary(Bytes.toBytes("Sample10")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample12")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample13")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample14"))
+                                      )))))
+              .append("$DELETE_FROM_SET", new BsonDocument()
+                      .append("new_samples",
+                              new BsonDocument().append("$set",
+                                      new BsonArray(Arrays.asList(
+                                              new 
BsonBinary(Bytes.toBytes("Sample02")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample03"))
+                                      )))))
+              .append("$SET", new BsonDocument()
+                      .append("newrecord", ((BsonArray) 
(document1.get("track"))).get(0)))
+              .append("$UNSET", new BsonDocument()
+                      .append("rather[3].outline.halfway.so[2][2]", new 
BsonNull()));
+
+      conditionExpression =
+              "field_not_exists(newrecord) AND 
field_exists(rather[3].outline.halfway.so[2][2])";
+
+      conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument());
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END");
+
+      stmt.setString(1, "pk1010");
+      stmt.executeUpdate();
+
+      updateExp = new BsonDocument()
+              .append("$SET", new BsonDocument()
+                      .append("result[1].location.state", new 
BsonString("AK")))
+              .append("$UNSET", new BsonDocument()
+                      .append("result[4].emails[1]", new BsonNull()));
+
+      conditionExpression =
+              "result[2].location.coordinates.latitude > $latitude OR "
+                      + "(field_exists(result[1].location) AND 
result[1].location.state != $state" +
+                      " AND field_exists(result[4].emails[1]))";
+
+      conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument()
+              .append("$latitude", new BsonDouble(0))
+              .append("$state", new BsonString("AK")));
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END");
+
+      stmt.setString(1, "pk1011");
+      stmt.executeUpdate();
+
+      conn.commit();
+      injectEdge.incrementValue(1000);
+
+      // Capture timestamp before TTL expiration
+      Timestamp beforeTTLTimestamp = new Timestamp(injectEdge.currentTime());
+
+      // Capture last post-images for each row before TTL expiration
+      Map<String, Map<String, Object>> lastPostImages = new HashMap<>();
+
+      String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcName +
+              " WHERE PHOENIX_ROW_TIMESTAMP() <= ? ORDER BY 
PHOENIX_ROW_TIMESTAMP() DESC";
+      try (PreparedStatement pst = conn.prepareStatement(cdcQuery)) {
+        pst.setTimestamp(1, beforeTTLTimestamp);
+        try (ResultSet cdcRs = pst.executeQuery()) {
+          while (cdcRs.next()) {
+            String pk = cdcRs.getString(2);
+            if (!lastPostImages.containsKey(pk)) {
+              String cdcVal = cdcRs.getString(3);
+              Map<String, Object> cdcEvent = OBJECT_MAPPER.readValue(cdcVal, 
HashMap.class);
+              if (cdcEvent.containsKey(CDC_POST_IMAGE)) {
+                lastPostImages.put(pk, (Map<String, Object>) 
cdcEvent.get(CDC_POST_IMAGE));
+              }
+            }
+          }
+        }
+      }
+
+      // Verify all rows have post-images captured
+      assertEquals("Should have post-images for all 3 rows", 3, 
lastPostImages.size());
+      assertNotNull("Should have post-image for pk0001", 
lastPostImages.get("pk0001"));
+      assertNotNull("Should have post-image for pk1010", 
lastPostImages.get("pk1010"));
+      assertNotNull("Should have post-image for pk1011", 
lastPostImages.get("pk1011"));
+
+      // Advance time past TTL to expire rows
+      injectEdge.incrementValue((ttlSeconds + maxLookbackAge + 1) * 1000);
+
+      // Flush and major compact to trigger TTL expiration
+      Admin admin = getUtility().getAdmin();
+      admin.flush(TableName.valueOf(tableName));
+
+      TestUtil.majorCompact(getUtility(), TableName.valueOf(tableName));
+
+      // Verify all rows are expired from data table
+      String dataQuery = "SELECT * FROM " + tableName;
+      try (ResultSet dataRs = conn.createStatement().executeQuery(dataQuery)) {
+        assertFalse("All rows should be expired from data table", 
dataRs.next());
+      }
+
+      // Verify TTL_DELETE CDC events were generated for all rows
+      String ttlDeleteQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcName +
+              " WHERE PHOENIX_ROW_TIMESTAMP() > ?";
+      Map<String, Map<String, Object>> ttlDeleteEvents = new HashMap<>();
+
+      try (PreparedStatement pst = conn.prepareStatement(ttlDeleteQuery)) {
+        pst.setTimestamp(1, beforeTTLTimestamp);
+        try (ResultSet ttlRs = pst.executeQuery()) {
+          while (ttlRs.next()) {
+            String pk = ttlRs.getString(2);
+            String cdcVal = ttlRs.getString(3);
+            Map<String, Object> cdcEvent = OBJECT_MAPPER.readValue(cdcVal, 
HashMap.class);
+
+            // Only process TTL delete events
+            if 
(CDC_TTL_DELETE_EVENT_TYPE.equals(cdcEvent.get(CDC_EVENT_TYPE))) {
+              ttlDeleteEvents.put(pk, (Map<String, Object>) 
cdcEvent.get(CDC_PRE_IMAGE));
+            }
+          }
+        }
+      }
+
+      // Verify TTL delete events for all rows
+      assertEquals("Should have TTL delete events for all 3 rows", 3, 
ttlDeleteEvents.size());
+
+      // Verify pre-image consistency for each row
+      for (String pk : Arrays.asList("pk0001", "pk1010", "pk1011")) {
+        Map<String, Object> ttlPreImage = ttlDeleteEvents.get(pk);
+        assertNotNull("Should have TTL delete event for " + pk, ttlPreImage);
+        Map<String, Object> lastPostImage = lastPostImages.get(pk);
+        assertNotNull("TTL pre-image should not be null for " + pk, 
ttlPreImage);
+        assertNotNull("Last post-image should not be null for " + pk, 
lastPostImage);
+        assertEquals("TTL delete pre-image should match last post-image for " 
+ pk,
+                lastPostImage, ttlPreImage);
+      }
+    } finally {
+      EnvironmentEdgeManager.reset();
     }
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 1d68ce776c..7fdabc648b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -28,6 +28,9 @@ import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.*;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -46,10 +49,16 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static 
org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
 import static org.junit.Assert.*;
 
 @Category(NeedsOwnMiniClusterTest.class)
@@ -314,6 +323,102 @@ public class TableTTLIT extends BaseTest {
         }
     }
 
+    @Test
+    public void testRowSpansMultipleTTLWindowsWithCdc() throws Exception {
+        final int maxLookbackAge = tableLevelMaxLookback != null
+                ? tableLevelMaxLookback : MAX_LOOKBACK_AGE;
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String schemaName = generateUniqueName();
+            String tableName = schemaName + "." + generateUniqueName();
+            String noCompactTableName = generateUniqueName();
+            createTable(tableName);
+            createTable(noCompactTableName);
+            conn.createStatement().execute("ALTER TABLE " + tableName
+                    + " SET \"phoenix.max.lookback.age.seconds\" = " + 
maxLookbackAge);
+
+            // Create CDC index for TTL verification
+            String cdcName = generateUniqueName();
+            String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName +
+                    " INCLUDE (PRE, POST)";
+            conn.createStatement().execute(cdcSql);
+            conn.commit();
+
+            String cdcFullName = SchemaUtil.getTableName(null, schemaName + 
"." + cdcName);
+
+            ObjectMapper mapper = new ObjectMapper();
+            long startTime = System.currentTimeMillis() + 1000;
+            startTime = (startTime / 1000) * 1000;
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            injectEdge.setValue(startTime);
+
+            // Track the last post-image from normal CDC events
+            Map<String, Object> lastPostImage = null;
+
+            for (int columnIndex = 1; columnIndex <= MAX_COLUMN_INDEX; 
columnIndex++) {
+                String value = Integer.toString(RAND.nextInt(1000));
+                updateColumn(conn, tableName, "a", columnIndex, value);
+                updateColumn(conn, noCompactTableName, "a", columnIndex, 
value);
+                conn.commit();
+
+                // Capture the last post-image from CDC events
+                String cdcQuery =
+                        "SELECT PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" FROM " + 
cdcFullName +
+                                " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC LIMIT 
1";
+                try (ResultSet rs = 
conn.createStatement().executeQuery(cdcQuery)) {
+                    if (rs.next()) {
+                        Map<String, Object> cdcEvent =
+                                mapper.readValue(rs.getString(2), 
HashMap.class);
+                        if (cdcEvent.containsKey(CDC_POST_IMAGE)) {
+                            lastPostImage = (Map<String, Object>) 
cdcEvent.get(CDC_POST_IMAGE);
+                        }
+                    }
+                }
+
+                injectEdge.incrementValue(ttl * 1000 - 1000);
+            }
+            assertNotNull("Last post-image should not be null", lastPostImage);
+
+            // Advance time past TTL to expire the row
+            injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000);
+
+            flush(TableName.valueOf(tableName));
+            majorCompact(TableName.valueOf(tableName));
+
+            // Verify row is expired from data table
+            String dataQuery = "SELECT * FROM " + tableName + " WHERE id = 
'a'";
+            try (ResultSet rs = 
conn.createStatement().executeQuery(dataQuery)) {
+                assertFalse("Row should be expired from data table", 
rs.next());
+            }
+
+            // Verify TTL_DELETE CDC event was generated and compare pre-image
+            String cdcQuery = "SELECT \"CDC JSON\" FROM " + cdcFullName +
+                    " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC LIMIT 1";
+            try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery)) 
{
+                assertTrue("Should find TTL delete event", rs.next());
+                Map<String, Object> ttlDeleteEvent =
+                        mapper.readValue(rs.getString(1), HashMap.class);
+                LOG.info("TTL delete event: {}", ttlDeleteEvent);
+
+                assertEquals("Should be ttl_delete event", 
CDC_TTL_DELETE_EVENT_TYPE,
+                        ttlDeleteEvent.get(CDC_EVENT_TYPE));
+
+                Map<String, Object> ttlPreImage =
+                        (Map<String, Object>) 
ttlDeleteEvent.get(CDC_PRE_IMAGE);
+                assertNotNull("TTL pre-image should not be null", ttlPreImage);
+
+                assertEquals(
+                        "TTL delete pre-image should match last post-image 
from normal CDC events",
+                        lastPostImage, ttlPreImage);
+
+                assertFalse("No more event should be found", rs.next());
+            }
+
+            compareRow(conn, tableName, noCompactTableName, "a", 
MAX_COLUMN_INDEX);
+            injectEdge.incrementValue(1000);
+        }
+    }
+
     @Test
     public void testMultipleRowsWithUpdatesMoreThanTTLApart() throws Exception 
{
         // for the purpose of this test only considering cases when 
maxlookback is 0
@@ -544,6 +649,156 @@ public class TableTTLIT extends BaseTest {
         }
     }
 
+    /**
+     * Test CDC events for TTL expired rows. This test creates a table with 
TTL and CDC index,
+     * verifies insert/update CDC events with pre/post images, then triggers 
major compaction
+     * to expire rows and verifies TTL_DELETE events with pre-image data.
+     */
+    @Test
+    public void testCDCTTLExpiredRows() throws Exception {
+        final int maxLookbackAge = tableLevelMaxLookback != null
+                ? tableLevelMaxLookback : MAX_LOOKBACK_AGE;
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String schemaName = generateUniqueName();
+            String tableName = schemaName + "." + generateUniqueName();
+            String cdcName = generateUniqueName();
+            ObjectMapper mapper = new ObjectMapper();
+
+            createTable(tableName);
+            conn.createStatement().execute("ALTER TABLE " + tableName
+                    + " SET \"phoenix.max.lookback.age.seconds\" = " + 
maxLookbackAge);
+
+            String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName +
+                    " INCLUDE (PRE, POST)";
+            conn.createStatement().execute(cdcSql);
+            conn.commit();
+
+            String cdcIndexName =
+                    schemaName + "." + CDCUtil.getCDCIndexName(schemaName + 
"." + cdcName);
+            String cdcFullName = SchemaUtil.getTableName(null, schemaName + 
"." + cdcName);
+
+            PTable cdcIndex = ((PhoenixConnection) 
conn).getTableNoCache(cdcIndexName);
+            assertNotNull("CDC index should be created", cdcIndex);
+            assertTrue("CDC index should be CDC type", 
CDCUtil.isCDCIndex(cdcIndex));
+
+            // Setup time injection
+            long startTime = System.currentTimeMillis() + 1000;
+            startTime = (startTime / 1000) * 1000;
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            injectEdge.setValue(startTime);
+
+            // Insert initial row
+            updateRow(conn, tableName, "row1");
+            long insertTime = injectEdge.currentTime();
+            injectEdge.incrementValue(1000);
+
+            // Update the row
+            updateColumn(conn, tableName, "row1", 1, "updated_val1");
+            updateColumn(conn, tableName, "row1", 2, "updated_val2");
+            conn.commit();
+            long updateTime = injectEdge.currentTime();
+            injectEdge.incrementValue(1000);
+
+            // Verify CDC events for insert and update
+            String cdcQuery = "SELECT PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" 
FROM " + cdcFullName;
+            Map<String, Object> postImage;
+            try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery)) 
{
+                // First event - insert
+                assertTrue("Should have insert CDC event", rs.next());
+                long eventTimestamp = rs.getTimestamp(1).getTime();
+                assertTrue("Insert event timestamp should be close to insert 
time",
+                        Math.abs(eventTimestamp - insertTime) < 2000);
+
+                Map<String, Object> cdcEvent = 
mapper.readValue(rs.getString(2), HashMap.class);
+                assertEquals("Should be upsert event", CDC_UPSERT_EVENT_TYPE,
+                        cdcEvent.get(CDC_EVENT_TYPE));
+                assertTrue("Should have post-image", 
cdcEvent.containsKey(CDC_POST_IMAGE));
+
+                postImage = (Map<String, Object>) cdcEvent.get(CDC_POST_IMAGE);
+                assertFalse("post image must contain something", 
postImage.isEmpty());
+
+                // Second event - update
+                assertTrue("Should have update CDC event", rs.next());
+                eventTimestamp = rs.getTimestamp(1).getTime();
+                assertTrue("Update event timestamp should be close to update 
time",
+                        Math.abs(eventTimestamp - updateTime) < 2000);
+
+                cdcEvent = mapper.readValue(rs.getString(2), HashMap.class);
+                assertEquals("Should be upsert event", CDC_UPSERT_EVENT_TYPE,
+                        cdcEvent.get(CDC_EVENT_TYPE));
+                assertTrue("Should have pre-image", 
cdcEvent.containsKey(CDC_PRE_IMAGE));
+                assertTrue("Should have post-image", 
cdcEvent.containsKey(CDC_POST_IMAGE));
+
+                Map<String, Object> preImage = (Map<String, Object>) 
cdcEvent.get(CDC_PRE_IMAGE);
+                assertEquals("Comparison of last post-image with new 
pre-image", postImage,
+                        preImage);
+                postImage = (Map<String, Object>) cdcEvent.get(CDC_POST_IMAGE);
+                LOG.info("Post-image {}", postImage);
+            }
+
+            // Advance time past TTL to expire the row
+            injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000);
+
+            TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+            TestUtil.dumpTable(conn, TableName.valueOf(cdcIndexName));
+            flush(TableName.valueOf(tableName));
+            majorCompact(TableName.valueOf(tableName));
+            TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+            TestUtil.dumpTable(conn, TableName.valueOf(cdcIndexName));
+
+            // Verify row is expired from data table
+            String dataQuery = "SELECT * FROM " + tableName + " WHERE id = 
'row1'";
+            try (ResultSet rs = 
conn.createStatement().executeQuery(dataQuery)) {
+                assertFalse("Row should be expired from data table", 
rs.next());
+            }
+
+            // Verify TTL_DELETE CDC event was generated
+            try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery)) 
{
+                int eventCount = 0;
+                Map<String, Object> ttlDeleteEvent = null;
+
+                while (rs.next()) {
+                    eventCount++;
+                    Map<String, Object> cdcEvent = 
mapper.readValue(rs.getString(2), HashMap.class);
+                    String eventType = (String) cdcEvent.get(CDC_EVENT_TYPE);
+                    assertEquals("Event type must be " + 
CDC_TTL_DELETE_EVENT_TYPE + " but found " +
+                                    eventType,
+                            CDC_TTL_DELETE_EVENT_TYPE, eventType);
+                    if (CDC_TTL_DELETE_EVENT_TYPE.equals(eventType)) {
+                        ttlDeleteEvent = cdcEvent;
+                    }
+                }
+
+                assertEquals("Should have only 1 event for TTL_DELETE because 
other events are " +
+                        "expired due to major compaction", 1, eventCount);
+                assertNotNull("Should have TTL delete event", ttlDeleteEvent);
+
+                // Verify TTL delete event structure
+                assertEquals("Should be ttl_delete event", 
CDC_TTL_DELETE_EVENT_TYPE,
+                        ttlDeleteEvent.get(CDC_EVENT_TYPE));
+                assertTrue("TTL delete should have pre-image",
+                        ttlDeleteEvent.containsKey(CDC_PRE_IMAGE));
+
+                Map<String, Object> preImage =
+                        (Map<String, Object>) 
ttlDeleteEvent.get(CDC_PRE_IMAGE);
+                assertEquals("Comparison of last post-image with new 
pre-image", postImage,
+                        preImage);
+                LOG.info("TTL delete event verified: {}", ttlDeleteEvent);
+            }
+
+            String cdcScanQuery = "SELECT \"CDC JSON\" FROM " + cdcFullName +
+                    " WHERE \"CDC JSON\" LIKE '%ttl_delete%'";
+            try (ResultSet rs = 
conn.createStatement().executeQuery(cdcScanQuery)) {
+                assertTrue("Should find TTL delete event via scan", rs.next());
+                Map<String, Object> cdcEvent = 
mapper.readValue(rs.getString(1), HashMap.class);
+                assertEquals("Should be ttl_delete event", 
CDC_TTL_DELETE_EVENT_TYPE,
+                        cdcEvent.get(CDC_EVENT_TYPE));
+            }
+
+            LOG.info("CDC TTL test completed successfully for table: {}", 
tableName);
+        }
+    }
 
     private void flush(TableName table) throws IOException {
         Admin admin = getUtility().getAdmin();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
index 6fdc674a04..8e582373f4 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
@@ -31,7 +31,6 @@ import static 
org.apache.phoenix.schema.LiteralTTLExpression.TTL_EXPRESSION_FORE
 import static org.apache.phoenix.util.TestUtil.retainSingleQuotes;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -43,6 +42,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
@@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.CounterGroup;
@@ -66,6 +67,7 @@ import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.PhoenixTestBuilder;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.OtherOptions;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions;
@@ -130,6 +132,8 @@ public class ConditionalTTLExpressionIT extends 
ParallelStatsDisabledIT {
     private Map<Integer, String> dataRowPosToKey = Maps.newHashMap();
     private Map<Integer, String> indexRowPosToKey = Maps.newHashMap();
 
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     public ConditionalTTLExpressionIT(boolean columnEncoded,
                                       Integer tableLevelMaxLooback) {
         this.columnEncoded = columnEncoded;
@@ -421,6 +425,193 @@ public class ConditionalTTLExpressionIT extends 
ParallelStatsDisabledIT {
         }
     }
 
+    /**
+     * Tests CDC (Change Data Capture) functionality with TTL (Time To Live) 
expired rows.
+     * This test validates the complete CDC lifecycle including:
+     */
+    @Test
+    public void testPhoenixRowTimestampWithCdc() throws Exception {
+        int ttl = 50 * 1000;
+        String ttlExpression = String.format(
+                "TO_NUMBER(CURRENT_TIME()) - 
TO_NUMBER(PHOENIX_ROW_TIMESTAMP()) >= %d", ttl);
+        createTable(ttlExpression);
+        String tableName = schemaBuilder.getEntityTableName();
+        String cdcName = "cdc_" + generateUniqueName();
+        injectEdge();
+        int rowCount = 5;
+        long actual;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Initial Setup - Create CDC index on the table
+            conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + 
tableName);
+            populateTable(conn, rowCount);
+
+            // Verify initial row count
+            actual = TestUtil.getRowCount(conn, tableName, true);
+            assertEquals("Table should contain all inserted rows", 5, actual);
+
+            // Query initial CDC events (inserts)
+            String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
+                    PhoenixTestBuilder.DDLDefaults.DEFAULT_SCHEMA_NAME + "." + 
cdcName;
+
+            ResultSet resultSet = 
conn.createStatement().executeQuery(cdcQuery);
+            List<Map<String, Object>> postImageList = new ArrayList<>();
+            while (resultSet.next()) {
+                String cdcVal = resultSet.getString(4);
+                Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, 
Map.class);
+
+                // Validate insert events have no pre-image but have post-image
+                Map<String, Object> preImage =
+                        (Map<String, Object>) 
map.get(QueryConstants.CDC_PRE_IMAGE);
+                assertTrue("Insert events should have empty pre-image", 
preImage.isEmpty());
+
+                Map<String, Object> postImage =
+                        (Map<String, Object>) 
map.get(QueryConstants.CDC_POST_IMAGE);
+                assertFalse("Insert events should have non-empty post-image", 
postImage.isEmpty());
+                postImageList.add(postImage);
+
+                assertEquals("Initial events should be UPSERT type",
+                        QueryConstants.CDC_UPSERT_EVENT_TYPE,
+                        map.get(QueryConstants.CDC_EVENT_TYPE));
+            }
+            assertEquals("Post image list size should be 5 but it is " + 
postImageList.size(), 5,
+                    postImageList.size());
+
+            // TTL Expiration - Advance time to trigger TTL expiration
+            injectEdge.incrementValue(ttl);
+            doMajorCompaction(tableName);
+
+            // Verify all rows are expired from data table
+            actual = TestUtil.getRowCount(conn, tableName, true);
+            assertEquals("All rows should be expired after TTL", 0, actual);
+
+            // TTL CDC Events - Validate TTL_DELETE events are generated
+            resultSet = conn.createStatement().executeQuery(cdcQuery);
+            int i = 0;
+            while (resultSet.next()) {
+                String cdcVal = resultSet.getString(4);
+                Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, 
Map.class);
+
+                // Validate TTL delete events
+                assertEquals("TTL expired rows should generate TTL_DELETE 
events",
+                        QueryConstants.CDC_TTL_DELETE_EVENT_TYPE,
+                        map.get(QueryConstants.CDC_EVENT_TYPE));
+
+                Map<String, Object> preImage =
+                        (Map<String, Object>) 
map.get(QueryConstants.CDC_PRE_IMAGE);
+                assertFalse("TTL_DELETE events should have non-empty 
pre-image",
+                        preImage.isEmpty());
+
+                Map<String, Object> postImage =
+                        (Map<String, Object>) 
map.get(QueryConstants.CDC_POST_IMAGE);
+                assertTrue("TTL_DELETE events should have empty post-image", 
postImage.isEmpty());
+
+                // TTL delete pre-image should match previous upsert post-image
+                assertEquals("TTL_DELETE pre-image should match original 
insert post-image",
+                        postImageList.get(i), preImage);
+                i++;
+            }
+            assertEquals("Num of TTL_DELETE events verified should be 5 but it 
is " + i, 5, i);
+
+            // Update an expired row to bring it back
+            injectEdge.incrementValue(1);
+            long currentTime = injectEdge.currentTime();
+            updateColumn(conn, 1, "VAL4", currentTime);
+
+            // Verify the row
+            actual = TestUtil.getRowCount(conn, tableName, true);
+            assertEquals("Only one row should be resurrected after update", 1, 
actual);
+
+            // Verify resurrected row has only updated column visible
+            try (ResultSet rs = readRow(conn, 1)) {
+                assertTrue("Resurrected row should exist", rs.next());
+                for (String col : COLUMNS) {
+                    if (!col.equals("VAL4")) {
+                        assertNull("Non-updated columns should be null in 
resurrected row",
+                                rs.getObject(col));
+                    } else {
+                        assertEquals("Updated column should have new 
timestamp",
+                                currentTime, 
rs.getTimestamp("VAL4").getTime());
+                    }
+                }
+            }
+
+            // Advance time beyond max lookback window
+            injectEdge.incrementValue(tableLevelMaxLookback * 1000L + 2);
+            doMajorCompaction(tableName);
+            CellCount expectedCellCount = new CellCount();
+            expectedCellCount.insertRow(dataRowPosToKey.get(1), 2);
+            validateTable(conn, tableName, expectedCellCount, 
dataRowPosToKey.values());
+
+            // Query CDC events
+            cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
+                    PhoenixTestBuilder.DDLDefaults.DEFAULT_SCHEMA_NAME + "." + 
cdcName
+                    + " WHERE PHOENIX_ROW_TIMESTAMP() >= ?";
+            PreparedStatement ps = conn.prepareStatement(cdcQuery);
+            ps.setTimestamp(1, new Timestamp(currentTime));
+            resultSet = ps.executeQuery();
+            postImageList = new ArrayList<>();
+            while (resultSet.next()) {
+                String cdcVal = resultSet.getString(4);
+                Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, 
Map.class);
+
+                assertEquals("Resurrection event should be UPSERT type",
+                        QueryConstants.CDC_UPSERT_EVENT_TYPE,
+                        map.get(QueryConstants.CDC_EVENT_TYPE));
+
+                Map<String, Object> preImage =
+                        (Map<String, Object>) 
map.get(QueryConstants.CDC_PRE_IMAGE);
+                assertTrue("Resurrection event should have empty pre-image", 
preImage.isEmpty());
+
+                Map<String, Object> postImage =
+                        (Map<String, Object>) 
map.get(QueryConstants.CDC_POST_IMAGE);
+                assertFalse("Resurrection event should have non-empty 
post-image",
+                        postImage.isEmpty());
+                postImageList.add(postImage);
+            }
+            assertEquals("Post image list size should be 5 but it is " + 
postImageList.size(), 1,
+                    postImageList.size());
+
+            // Trigger TTL expiration again
+            injectEdge.incrementValue(ttl);
+            doMajorCompaction(tableName);
+
+            // Verify all rows are expired from data table
+            actual = TestUtil.getRowCount(conn, tableName, true);
+            assertEquals("All rows should be expired after TTL", 0, actual);
+
+            expectedCellCount = new CellCount();
+            validateTable(conn, tableName, expectedCellCount, 
dataRowPosToKey.values());
+
+            // Validate second round of TTL_DELETE events
+            ps = conn.prepareStatement(cdcQuery);
+            ps.setTimestamp(1, new Timestamp(currentTime));
+            resultSet = ps.executeQuery();
+            i = 0;
+            while (resultSet.next()) {
+                String cdcVal = resultSet.getString(4);
+                Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, 
Map.class);
+
+                assertEquals("Second TTL expiration should generate TTL_DELETE 
events",
+                        QueryConstants.CDC_TTL_DELETE_EVENT_TYPE,
+                        map.get(QueryConstants.CDC_EVENT_TYPE));
+
+                Map<String, Object> preImage =
+                        (Map<String, Object>) 
map.get(QueryConstants.CDC_PRE_IMAGE);
+                assertFalse("Second TTL_DELETE should have non-empty 
pre-image",
+                        preImage.isEmpty());
+
+                Map<String, Object> postImage =
+                        (Map<String, Object>) 
map.get(QueryConstants.CDC_POST_IMAGE);
+                assertTrue("Second TTL_DELETE should have empty post-image", 
postImage.isEmpty());
+
+                assertEquals("Second TTL_DELETE pre-image should match 
resurrection post-image",
+                        postImageList.get(i), preImage);
+                i++;
+            }
+            assertEquals("Num of TTL_DELETE events verified should be 5 but it 
is " + i, 1, i);
+        }
+    }
+
     @Test
     public void testDeleteMarkers() throws Exception {
         String ttlCol = "VAL5";

Reply via email to