haridsv commented on code in PR #2209:
URL: https://github.com/apache/phoenix/pull/2209#discussion_r2185482029


##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -259,4 +310,44 @@ private Object getColumnValue(byte[] cellValue, int 
offset, int length, PDataTyp
         }
         return CDCUtil.getColumnEncodedValue(value, dataType);
     }
+
+    /**
+     * Handles CDC events that already contain pre-image data, avoiding data 
table scan.
+     * Supports both the new CDC_IMAGE_CQ column and traditional CDC JSON 
column.
+     *
+     * @param indexRow    The CDC index row cells
+     * @param indexRowKey The CDC index row key
+     * @param indexCell   The primary index cell
+     * @param result      The result list to populate
+     * @return true if event was processed successfully
+     */
+    private boolean handlePreImageCDCEvent(List<Cell> indexRow, byte[] 
indexRowKey,
+                                           Cell indexCell, List<Cell> result) {
+        Cell cdcDataCell = null;
+        for (Cell cell : indexRow) {
+            if (Bytes.equals(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                    cell.getQualifierLength(),
+                    QueryConstants.CDC_IMAGE_CQ_BYTES, 0,
+                    QueryConstants.CDC_IMAGE_CQ_BYTES.length)) {
+                cdcDataCell = cell;
+                break;
+            }
+        }
+        if (cdcDataCell == null) {
+            return false;
+        }
+        byte[] cdcEventBytes = CellUtil.cloneValue(cdcDataCell);

Review Comment:
   Can we not make use of the optimization done by 
`ImmutableBytesPtr.cloneCellValueIfNecessary`?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR;
+
+/**
+ * Utility class for CDC (Change Data Capture) operations during compaction.
+ * This class contains utilities for handling TTL row expiration events and 
generating
+ * CDC events with pre-image data that are written directly to CDC index 
tables.
+ */
+public final class CDCCompactionUtil {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCCompactionUtil.class);
+
+    private CDCCompactionUtil() {
+        // empty
+    }
+
+    /**
+     * Finds the column name for a given cell in the data table.
+     *
+     * @param dataTable The data table
+     * @param cell      The cell
+     * @return The column name or null if not found
+     */
+    static String findColumnName(PTable dataTable, Cell cell) {
+        try {
+            byte[] family = CellUtil.cloneFamily(cell);
+            byte[] qualifier = CellUtil.cloneQualifier(cell);
+            byte[] defaultCf = dataTable.getDefaultFamilyName() != null
+                    ? dataTable.getDefaultFamilyName().getBytes()
+                    : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+            for (PColumn column : dataTable.getColumns()) {
+                if (column.getFamilyName() != null
+                        && Bytes.equals(family, 
column.getFamilyName().getBytes())
+                        && Bytes.equals(qualifier, 
column.getColumnQualifierBytes())) {
+                    if (Bytes.equals(defaultCf, 
column.getFamilyName().getBytes())) {
+                        return column.getName().getString();
+                    } else {
+                        return column.getFamilyName().getString() + 
NAME_SEPARATOR
+                                + column.getName().getString();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("Error finding column name for cell: {}", 
CellUtil.toString(cell, true),
+                    e);
+        }
+        return null;
+    }
+
+    /**
+     * Creates a CDC event map for TTL delete with pre-image data.
+     *
+     * @param expiredRowPut The expired row data
+     * @param dataTable     The data table
+     * @param preImage      Pre-image map
+     * @return CDC event map
+     */
+    static Map<String, Object> createTTLDeleteCDCEvent(Put expiredRowPut, 
PTable dataTable,
+                                                       Map<String, Object> 
preImage)
+            throws Exception {
+        Map<String, Object> cdcEvent = new HashMap<>();
+        cdcEvent.put(QueryConstants.CDC_EVENT_TYPE, 
QueryConstants.CDC_TTL_DELETE_EVENT_TYPE);
+        for (List<Cell> familyCells : 
expiredRowPut.getFamilyCellMap().values()) {
+            for (Cell cell : familyCells) {
+                String columnName = findColumnName(dataTable, cell);
+                if (columnName != null) {
+                    PColumn column = dataTable.getColumnForColumnQualifier(
+                            CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell));
+                    Object value = 
column.getDataType().toObject(cell.getValueArray(),
+                            cell.getValueOffset(),
+                            cell.getValueLength());
+                    Object encodedValue =
+                            CDCUtil.getColumnEncodedValue(value, 
column.getDataType());
+                    preImage.put(columnName, encodedValue);
+                }
+            }
+        }
+        cdcEvent.put(QueryConstants.CDC_PRE_IMAGE, preImage);
+        cdcEvent.put(QueryConstants.CDC_POST_IMAGE, Collections.emptyMap());
+        return cdcEvent;
+    }

Review Comment:
   Can this not build CDCTableInfo and use it with CDCChangeBuilder to avoid 
some duplication of logic?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR;
+
+/**
+ * Utility class for CDC (Change Data Capture) operations during compaction.
+ * This class contains utilities for handling TTL row expiration events and 
generating
+ * CDC events with pre-image data that are written directly to CDC index 
tables.
+ */
+public final class CDCCompactionUtil {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCCompactionUtil.class);
+
+    private CDCCompactionUtil() {
+        // empty
+    }
+
+    /**
+     * Finds the column name for a given cell in the data table.
+     *
+     * @param dataTable The data table
+     * @param cell      The cell
+     * @return The column name or null if not found
+     */
+    static String findColumnName(PTable dataTable, Cell cell) {
+        try {
+            byte[] family = CellUtil.cloneFamily(cell);
+            byte[] qualifier = CellUtil.cloneQualifier(cell);
+            byte[] defaultCf = dataTable.getDefaultFamilyName() != null
+                    ? dataTable.getDefaultFamilyName().getBytes()
+                    : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+            for (PColumn column : dataTable.getColumns()) {
+                if (column.getFamilyName() != null
+                        && Bytes.equals(family, 
column.getFamilyName().getBytes())
+                        && Bytes.equals(qualifier, 
column.getColumnQualifierBytes())) {
+                    if (Bytes.equals(defaultCf, 
column.getFamilyName().getBytes())) {
+                        return column.getName().getString();
+                    } else {
+                        return column.getFamilyName().getString() + 
NAME_SEPARATOR
+                                + column.getName().getString();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("Error finding column name for cell: {}", 
CellUtil.toString(cell, true),
+                    e);
+        }
+        return null;
+    }
+
+    /**
+     * Creates a CDC event map for TTL delete with pre-image data.
+     *
+     * @param expiredRowPut The expired row data
+     * @param dataTable     The data table
+     * @param preImage      Pre-image map
+     * @return CDC event map
+     */
+    static Map<String, Object> createTTLDeleteCDCEvent(Put expiredRowPut, 
PTable dataTable,
+                                                       Map<String, Object> 
preImage)
+            throws Exception {
+        Map<String, Object> cdcEvent = new HashMap<>();
+        cdcEvent.put(QueryConstants.CDC_EVENT_TYPE, 
QueryConstants.CDC_TTL_DELETE_EVENT_TYPE);
+        for (List<Cell> familyCells : 
expiredRowPut.getFamilyCellMap().values()) {
+            for (Cell cell : familyCells) {
+                String columnName = findColumnName(dataTable, cell);
+                if (columnName != null) {
+                    PColumn column = dataTable.getColumnForColumnQualifier(
+                            CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell));
+                    Object value = 
column.getDataType().toObject(cell.getValueArray(),
+                            cell.getValueOffset(),
+                            cell.getValueLength());
+                    Object encodedValue =
+                            CDCUtil.getColumnEncodedValue(value, 
column.getDataType());
+                    preImage.put(columnName, encodedValue);
+                }
+            }
+        }
+        cdcEvent.put(QueryConstants.CDC_PRE_IMAGE, preImage);
+        cdcEvent.put(QueryConstants.CDC_POST_IMAGE, Collections.emptyMap());
+        return cdcEvent;
+    }
+
+    /**
+     * Builds CDC index Put mutation.
+     *
+     * @param cdcIndex            The CDC index table
+     * @param expiredRowPut       The expired row data as a Put
+     * @param eventTimestamp      The timestamp for the CDC event
+     * @param cdcEventBytes       The CDC event data to store
+     * @param dataTable           The data table
+     * @param env                 The region coprocessor environment
+     * @param region              The HBase region
+     * @param compactionTimeBytes The compaction time as bytes
+     * @return The CDC index Put mutation
+     */
+    static Put buildCDCIndexPut(PTable cdcIndex, Put expiredRowPut, long 
eventTimestamp,
+                                byte[] cdcEventBytes, PTable dataTable,
+                                RegionCoprocessorEnvironment env, Region 
region,
+                                byte[] compactionTimeBytes) throws Exception {
+
+        try (PhoenixConnection serverConnection = 
QueryUtil.getConnectionOnServer(new Properties(),
+                env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+
+            IndexMaintainer cdcIndexMaintainer =
+                    cdcIndex.getIndexMaintainer(dataTable, serverConnection);
+
+            ValueGetter dataRowVG = new 
IndexUtil.SimpleValueGetter(expiredRowPut);
+            ImmutableBytesPtr rowKeyPtr = new 
ImmutableBytesPtr(expiredRowPut.getRow());
+
+            Put cdcIndexPut = cdcIndexMaintainer.buildUpdateMutation(
+                    GenericKeyValueBuilder.INSTANCE,
+                    dataRowVG,
+                    rowKeyPtr,
+                    eventTimestamp,
+                    null,
+                    null,
+                    false,
+                    region.getRegionInfo().getEncodedNameAsBytes());
+
+            byte[] rowKey = cdcIndexPut.getRow().clone();
+            System.arraycopy(compactionTimeBytes, 0, rowKey,
+                    PartitionIdFunction.PARTITION_ID_LENGTH, 
PDate.INSTANCE.getByteSize());
+            Put newCdcIndexPut = new Put(rowKey, eventTimestamp);
+
+            newCdcIndexPut.addColumn(
+                    
cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                    cdcIndexMaintainer.getEmptyKeyValueQualifier(), 
eventTimestamp,
+                    QueryConstants.UNVERIFIED_BYTES);
+
+            // Add CDC event data
+            newCdcIndexPut.addColumn(
+                    QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                    QueryConstants.CDC_IMAGE_CQ_BYTES,
+                    eventTimestamp,
+                    cdcEventBytes);
+
+            return newCdcIndexPut;
+        }
+    }
+
+    /**
+     * Generates and applies a CDC index mutation for TTL expired row with 
retries if required.
+     *
+     * @param cdcIndex                 The CDC index table
+     * @param dataTable                The data table
+     * @param expiredRowPut            The expired row data as a Put
+     * @param eventTimestamp           The timestamp for the CDC event
+     * @param tableName                The table name for logging
+     * @param env                      The region coprocessor environment
+     * @param region                   The HBase region
+     * @param compactionTimeBytes      The compaction time as bytes
+     * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations
+     */
+    static void generateCDCIndexMutation(PTable cdcIndex, PTable dataTable,
+                                         Put expiredRowPut,
+                                         long eventTimestamp, String tableName,
+                                         RegionCoprocessorEnvironment env, 
Region region,
+                                         byte[] compactionTimeBytes,
+                                         int cdcTtlMutationMaxRetries)
+            throws Exception {
+        Map<String, Object> cdcEvent =
+                createTTLDeleteCDCEvent(expiredRowPut, dataTable, new 
HashMap<>());
+        byte[] cdcEventBytes =
+                
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent);
+        Put cdcIndexPut =
+                buildCDCIndexPut(cdcIndex, expiredRowPut, eventTimestamp, 
cdcEventBytes,
+                        dataTable, env, region, compactionTimeBytes);
+
+        Exception lastException = null;
+        for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries; 
retryCount++) {
+            try (Table cdcIndexTable = 
env.getConnection().getTable(TableName.valueOf(
+                    cdcIndex.getPhysicalName().getBytes()))) {
+                CheckAndMutate checkAndMutate =
+                        CheckAndMutate.newBuilder(cdcIndexPut.getRow())
+                                
.ifNotExists(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                                        QueryConstants.CDC_IMAGE_CQ_BYTES)
+                                .build(cdcIndexPut);
+                CheckAndMutateResult result = 
cdcIndexTable.checkAndMutate(checkAndMutate);
+
+                if (result.isSuccess()) {
+                    // Successfully inserted new CDC event - Single CF case
+                    lastException = null;
+                    break;
+                } else {
+                    // Row already exists, need to retrieve existing pre-image 
and merge
+                    // Likely to happen for multi CF case
+                    Get get = new Get(cdcIndexPut.getRow());
+                    get.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                            QueryConstants.CDC_IMAGE_CQ_BYTES);
+                    Result existingResult = cdcIndexTable.get(get);
+
+                    if (!existingResult.isEmpty()) {
+                        Cell existingCell = existingResult.getColumnLatestCell(
+                                QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                                QueryConstants.CDC_IMAGE_CQ_BYTES);
+
+                        if (existingCell != null) {
+                            byte[] existingCdcBytes = 
CellUtil.cloneValue(existingCell);
+                            Map<String, Object> existingCdcEvent =
+                                    JacksonUtil.getObjectReader(HashMap.class)
+                                            .readValue(existingCdcBytes);
+                            Map<String, Object> existingPreImage =
+                                    (Map<String, Object>) 
existingCdcEvent.getOrDefault(
+                                            QueryConstants.CDC_PRE_IMAGE, new 
HashMap<>());
+
+                            // Create new TTL delete event with merged 
pre-image
+                            Map<String, Object> mergedCdcEvent =
+                                    createTTLDeleteCDCEvent(expiredRowPut, 
dataTable,
+                                            existingPreImage);
+                            byte[] mergedCdcEventBytes =
+                                    JacksonUtil.getObjectWriter(HashMap.class)
+                                            .writeValueAsBytes(mergedCdcEvent);
+
+                            Put mergedCdcIndexPut = buildCDCIndexPut(cdcIndex, 
expiredRowPut,
+                                    eventTimestamp, mergedCdcEventBytes, 
dataTable, env, region,
+                                    compactionTimeBytes);

Review Comment:
   This logic to incrementally merge would be quite inefficient right? What if 
we use a separate CF in Index, one for each CF in data table and merge them on 
read?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR;
+
+/**
+ * Utility class for CDC (Change Data Capture) operations during compaction.
+ * This class contains utilities for handling TTL row expiration events and 
generating
+ * CDC events with pre-image data that are written directly to CDC index 
tables.
+ */
+public final class CDCCompactionUtil {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCCompactionUtil.class);
+
+    private CDCCompactionUtil() {
+        // empty
+    }
+
+    /**
+     * Finds the column name for a given cell in the data table.
+     *
+     * @param dataTable The data table
+     * @param cell      The cell
+     * @return The column name or null if not found
+     */
+    static String findColumnName(PTable dataTable, Cell cell) {
+        try {
+            byte[] family = CellUtil.cloneFamily(cell);
+            byte[] qualifier = CellUtil.cloneQualifier(cell);
+            byte[] defaultCf = dataTable.getDefaultFamilyName() != null
+                    ? dataTable.getDefaultFamilyName().getBytes()
+                    : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+            for (PColumn column : dataTable.getColumns()) {
+                if (column.getFamilyName() != null
+                        && Bytes.equals(family, 
column.getFamilyName().getBytes())
+                        && Bytes.equals(qualifier, 
column.getColumnQualifierBytes())) {
+                    if (Bytes.equals(defaultCf, 
column.getFamilyName().getBytes())) {
+                        return column.getName().getString();
+                    } else {
+                        return column.getFamilyName().getString() + 
NAME_SEPARATOR
+                                + column.getName().getString();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("Error finding column name for cell: {}", 
CellUtil.toString(cell, true),
+                    e);
+        }
+        return null;
+    }
+
+    /**
+     * Creates a CDC event map for TTL delete with pre-image data.
+     *
+     * @param expiredRowPut The expired row data
+     * @param dataTable     The data table
+     * @param preImage      Pre-image map
+     * @return CDC event map
+     */
+    static Map<String, Object> createTTLDeleteCDCEvent(Put expiredRowPut, 
PTable dataTable,
+                                                       Map<String, Object> 
preImage)
+            throws Exception {
+        Map<String, Object> cdcEvent = new HashMap<>();
+        cdcEvent.put(QueryConstants.CDC_EVENT_TYPE, 
QueryConstants.CDC_TTL_DELETE_EVENT_TYPE);
+        for (List<Cell> familyCells : 
expiredRowPut.getFamilyCellMap().values()) {
+            for (Cell cell : familyCells) {
+                String columnName = findColumnName(dataTable, cell);
+                if (columnName != null) {
+                    PColumn column = dataTable.getColumnForColumnQualifier(
+                            CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell));
+                    Object value = 
column.getDataType().toObject(cell.getValueArray(),
+                            cell.getValueOffset(),
+                            cell.getValueLength());
+                    Object encodedValue =
+                            CDCUtil.getColumnEncodedValue(value, 
column.getDataType());
+                    preImage.put(columnName, encodedValue);
+                }
+            }
+        }
+        cdcEvent.put(QueryConstants.CDC_PRE_IMAGE, preImage);
+        cdcEvent.put(QueryConstants.CDC_POST_IMAGE, Collections.emptyMap());

Review Comment:
   What about CDC scope?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to