[ 
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814396#comment-17814396
 ] 

ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------

haridsv commented on code in PR #1813:
URL: https://github.com/apache/phoenix/pull/1813#discussion_r1477029813


##########
phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java:
##########
@@ -110,4 +109,25 @@ public static Scan initForRawScan(Scan scan) {
         }
         return scan;
     }
+
+    public static int compareCellFamilyAndQualifier(byte[] columnFamily1,
+                                                     byte[] columnQual1,
+                                                     byte[] columnFamily2,
+                                                     byte[] columnQual2) {
+        int familyNameComparison = CDCUtil.compare(columnFamily1, 
columnFamily2);
+        if (familyNameComparison != 0) {
+            return familyNameComparison;
+        }
+        return CDCUtil.compare(columnQual1, columnQual2);

Review Comment:
   Why not use `Arrays.compare()? I see its use in Protobuf too so it must be 
efficient.
   ```suggestion
           return Arrays.compare(columnQual1, 0, columnQual1.length, 
columnQual2, 0, columnQual2.length);
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java:
##########
@@ -1332,100 +1326,8 @@ public static void setScanAttributesForClient(Scan 
scan, PTable table,
             scan.setAttribute(CDC_INCLUDE_SCOPES,
                     
context.getEncodedCdcIncludeScopes().getBytes(StandardCharsets.UTF_8));

Review Comment:
   We should stuff this also into CDCTableInfo and let protobuf take care of it.



##########
phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java:
##########
@@ -308,6 +308,14 @@ enum JoinType {INNER, LEFT_OUTER}
 
     String CDC_JSON_COL_NAME = "CDC JSON";
 
+    String EVENT_TYPE = "event_type";
+    String PRE_IMAGE = "pre_image";
+    String POST_IMAGE = "post_image";
+    String CHANGE_IMAGE = "change_image";
+    String UPSERT_EVENT_TYPE = "upsert";
+    String DELETE_EVENT_TYPE = "delete";

Review Comment:
   I think we should put CDC_ prerfix on all of these keys.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +237,85 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<String, Map<String, Object>> preImageObj,
+            Map<String, Map<String, Object>> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            rowValueMap.put(PRE_IMAGE, preImageObj);
+        }
+
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            rowValueMap.put(CHANGE_IMAGE, changeImageObj);
+        }
+
+        Map<String, Map<String, Object>> postImageObj = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+            if (!isIndexCellDeleteRow) {
+                for (Map.Entry<String, Map<String, Object>> preImageObjFamily
+                        : preImageObj.entrySet()) {
+                    String columnFamily = preImageObjFamily.getKey();
+                    postImageObj.put(columnFamily, new HashMap<>());
+                    for (Map.Entry<String, Object> preImageColQual :
+                            preImageObjFamily.getValue().entrySet()) {
+                        
postImageObj.get(columnFamily).put(preImageColQual.getKey(),
+                                preImageColQual.getValue());
+                    }
+                }
+                for (Map.Entry<String, Map<String, Object>> 
changeImageObjFamily
+                        : changeImageObj.entrySet()) {
+                    String columnFamily = changeImageObjFamily.getKey();
+                    if (!postImageObj.containsKey(columnFamily)) {
+                        postImageObj.put(columnFamily, new HashMap<>());
+                    }
+                    for (Map.Entry<String, Object> changeImageColQual :
+                            changeImageObjFamily.getValue().entrySet()) {
+                        
postImageObj.get(columnFamily).put(changeImageColQual.getKey(),
+                                changeImageColQual.getValue());

Review Comment:
   Again, clone the original map and use putAll.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+            Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+            Long lowerBoundForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        if (columnListIndex > 0) {
+                            continue;
+                        }
+                        if (indexCellTS == cell.getTimestamp()) {
+                            isIndexCellDeleteRow = true;
+                        } else if (indexCellTS > cell.getTimestamp()) {
+                            lowerBoundForPreImage = cell.getTimestamp();
+                        }
+                    } else if (cell.getType() == Cell.Type.DeleteColumn
+                            || cell.getType() == Cell.Type.Put) {
+                        if (!Arrays.equals(cell.getQualifierArray(), emptyCQ)
+                                && CDCUtil.compareCellFamilyAndQualifier(
+                                        cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                        
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                        cdcColumnInfoList.get(columnListIndex)
+                                                .getColumnQualifier()) > 0) {
+                            while (columnListIndex < cdcColumnInfoList.size()
+                                    && CDCUtil.compareCellFamilyAndQualifier(
+                                    cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnQualifier()) > 0) {
+                                columnListIndex += 1;
                             }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
+                            if (columnListIndex >= cdcColumnInfoList.size()) {
+                                break;
                             }
                         }
-                        if (currentColumn != null) {
-                            columns.add(currentColumn);
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) < 0) {
+                            continue;
                         }
-                        List<Long> sortedTimestamps = 
uniqueTimeStamps.stream().sorted().collect(
-                                Collectors.toList());
-                        // FIXME: Does this need to be Concurrent?
-                        Map<ImmutableBytesPtr, Cell> rollingRow = new 
HashMap<>();
-                        int[] columnPointers = new int[columns.size()];
-                        changeTimeline = new TreeMap<>();
-                        dataRowChanges.put(dataRowKey, changeTimeline);
-                        for (Long ts : sortedTimestamps) {
-                            for (int i = 0; i < columns.size(); ++i) {
-                                Cell cell = 
columns.get(i).get(columnPointers[i]);
-                                if (cell.getTimestamp() == ts) {
-                                    rollingRow.put(new ImmutableBytesPtr(
-                                                    cell.getQualifierArray(),
-                                                    cell.getQualifierOffset(),
-                                                    cell.getQualifierLength()),
-                                            cell);
-                                    ++columnPointers[i];
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) == 0) {
+                            String columnFamily = StandardCharsets.UTF_8
+                                    
.decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnFamily())).toString();
+                            String columnQualifier = 
cdcColumnInfoList.get(columnListIndex)
+                                    .getColumnName();
+                            if (Arrays.equals(
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    
cdcDataTableInfo.getDefaultColumnFamily())) {
+                                columnFamily = DEFAULT_COLUMN_FAMILY_STR;
+                            }
+                            if (cell.getTimestamp() < indexCellTS
+                                    && cell.getTimestamp() > 
lowerBoundForPreImage) {
+                                if (!preImageObj.containsKey(columnFamily)) {
+                                    preImageObj.put(columnFamily, new 
HashMap<>());
+                                }
+                                if 
(preImageObj.get(columnFamily).containsKey(columnQualifier)) {
+                                    continue;
                                 }
+                                
preImageObj.get(columnFamily).put(columnQualifier,
+                                        this.getColumnValue(cell, 
cdcColumnInfoList
+                                                
.get(columnListIndex).getColumnType()));
+                            } else if (cell.getTimestamp() == indexCellTS) {
+                                if (!changeImageObj.containsKey(columnFamily)) 
{
+                                    changeImageObj.put(columnFamily, new 
HashMap<>());
+                                }
+                                
changeImageObj.get(columnFamily).put(columnQualifier,
+                                        this.getColumnValue(cell, 
cdcColumnInfoList
+                                                
.get(columnListIndex).getColumnType()));
                             }
-                            Map<ImmutableBytesPtr, Cell> rowOfCells = new 
HashMap();
-                            rowOfCells.putAll(rollingRow);
-                            changeTimeline.put(ts, rowOfCells);
                         }
                     }
-
-                    Map<ImmutableBytesPtr, Cell> mapOfCells = 
changeTimeline.get(indexRowTs);
-                    if (mapOfCells != null) {
-                        Map <String, Object> rowValueMap = new 
HashMap<>(mapOfCells.size());
-                        for (Map.Entry<ImmutableBytesPtr, Cell> entry: 
mapOfCells.entrySet()) {
-                            String colName = 
dataColQualNameMap.get(entry.getKey());
-                            Object colVal = 
dataColQualTypeMap.get(entry.getKey()).toObject(
-                                    entry.getValue().getValueArray());
-                            rowValueMap.put(colName, colVal);
-                        }
-                        byte[] value =
-                                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
-                        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
-                        ImmutableBytesPtr family = new 
ImmutableBytesPtr(firstCell.getFamilyArray(),
-                                firstCell.getFamilyOffset(), 
firstCell.getFamilyLength());
-                        dataRow = Result.create(Arrays.asList(builder.
-                                setRow(dataRowKey.copyBytesIfNecessary()).
-                                setFamily(family.copyBytesIfNecessary()).
-                                
setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
-                                setTimestamp(firstCell.getTimestamp()).
-                                setValue(value).
-                                setType(Cell.Type.Put).
-                                build()));
-                    }
                 }
-                if (dataRow != null && tupleProjector != null) {
-                    IndexUtil.addTupleAsOneCell(result, new 
ResultTuple(dataRow),
+                Result cdcRow = getCDCImage(
+                        preImageObj, changeImageObj, isIndexCellDeleteRow, 
indexCellTS, firstCell);
+                if (cdcRow != null && tupleProjector != null) {
+                    if (firstCell.getType() == Cell.Type.DeleteFamily) {
+                        
result.add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY)

Review Comment:
   Do we actually need to deep copy?



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+            Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+            Long lowerBoundForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        if (columnListIndex > 0) {
+                            continue;
+                        }
+                        if (indexCellTS == cell.getTimestamp()) {
+                            isIndexCellDeleteRow = true;
+                        } else if (indexCellTS > cell.getTimestamp()) {
+                            lowerBoundForPreImage = cell.getTimestamp();
+                        }
+                    } else if (cell.getType() == Cell.Type.DeleteColumn
+                            || cell.getType() == Cell.Type.Put) {
+                        if (!Arrays.equals(cell.getQualifierArray(), emptyCQ)
+                                && CDCUtil.compareCellFamilyAndQualifier(
+                                        cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                        
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                        cdcColumnInfoList.get(columnListIndex)
+                                                .getColumnQualifier()) > 0) {

Review Comment:
   Is this check really needed here? Won't the same check in the below while 
loop take care of it?



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+            Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+            Long lowerBoundForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        if (columnListIndex > 0) {
+                            continue;
+                        }
+                        if (indexCellTS == cell.getTimestamp()) {
+                            isIndexCellDeleteRow = true;
+                        } else if (indexCellTS > cell.getTimestamp()) {
+                            lowerBoundForPreImage = cell.getTimestamp();
+                        }
+                    } else if (cell.getType() == Cell.Type.DeleteColumn
+                            || cell.getType() == Cell.Type.Put) {
+                        if (!Arrays.equals(cell.getQualifierArray(), emptyCQ)
+                                && CDCUtil.compareCellFamilyAndQualifier(
+                                        cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                        
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                        cdcColumnInfoList.get(columnListIndex)
+                                                .getColumnQualifier()) > 0) {
+                            while (columnListIndex < cdcColumnInfoList.size()
+                                    && CDCUtil.compareCellFamilyAndQualifier(
+                                    cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnQualifier()) > 0) {
+                                columnListIndex += 1;
                             }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
+                            if (columnListIndex >= cdcColumnInfoList.size()) {
+                                break;
                             }
                         }
-                        if (currentColumn != null) {
-                            columns.add(currentColumn);
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) < 0) {

Review Comment:
   This check would be identical to the last call done in the above while loop 
correct? If you can save it you can avoid. In fact, if you convert the above 
while loop into an `while (true)`, then I think you can merge this and the one 
at 164 into the same loop.  You would have to use loop labels to break out of 
multiple loops.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+            Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+            Long lowerBoundForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        if (columnListIndex > 0) {
+                            continue;
+                        }
+                        if (indexCellTS == cell.getTimestamp()) {
+                            isIndexCellDeleteRow = true;
+                        } else if (indexCellTS > cell.getTimestamp()) {
+                            lowerBoundForPreImage = cell.getTimestamp();
+                        }
+                    } else if (cell.getType() == Cell.Type.DeleteColumn
+                            || cell.getType() == Cell.Type.Put) {
+                        if (!Arrays.equals(cell.getQualifierArray(), emptyCQ)
+                                && CDCUtil.compareCellFamilyAndQualifier(
+                                        cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                        
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                        cdcColumnInfoList.get(columnListIndex)
+                                                .getColumnQualifier()) > 0) {
+                            while (columnListIndex < cdcColumnInfoList.size()
+                                    && CDCUtil.compareCellFamilyAndQualifier(
+                                    cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnQualifier()) > 0) {
+                                columnListIndex += 1;
                             }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
+                            if (columnListIndex >= cdcColumnInfoList.size()) {
+                                break;
                             }
                         }
-                        if (currentColumn != null) {
-                            columns.add(currentColumn);
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) < 0) {
+                            continue;
                         }
-                        List<Long> sortedTimestamps = 
uniqueTimeStamps.stream().sorted().collect(
-                                Collectors.toList());
-                        // FIXME: Does this need to be Concurrent?
-                        Map<ImmutableBytesPtr, Cell> rollingRow = new 
HashMap<>();
-                        int[] columnPointers = new int[columns.size()];
-                        changeTimeline = new TreeMap<>();
-                        dataRowChanges.put(dataRowKey, changeTimeline);
-                        for (Long ts : sortedTimestamps) {
-                            for (int i = 0; i < columns.size(); ++i) {
-                                Cell cell = 
columns.get(i).get(columnPointers[i]);
-                                if (cell.getTimestamp() == ts) {
-                                    rollingRow.put(new ImmutableBytesPtr(
-                                                    cell.getQualifierArray(),
-                                                    cell.getQualifierOffset(),
-                                                    cell.getQualifierLength()),
-                                            cell);
-                                    ++columnPointers[i];
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) == 0) {
+                            String columnFamily = StandardCharsets.UTF_8
+                                    
.decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnFamily())).toString();
+                            String columnQualifier = 
cdcColumnInfoList.get(columnListIndex)
+                                    .getColumnName();
+                            if (Arrays.equals(
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    
cdcDataTableInfo.getDefaultColumnFamily())) {
+                                columnFamily = DEFAULT_COLUMN_FAMILY_STR;
+                            }
+                            if (cell.getTimestamp() < indexCellTS
+                                    && cell.getTimestamp() > 
lowerBoundForPreImage) {
+                                if (!preImageObj.containsKey(columnFamily)) {
+                                    preImageObj.put(columnFamily, new 
HashMap<>());
+                                }
+                                if 
(preImageObj.get(columnFamily).containsKey(columnQualifier)) {
+                                    continue;
                                 }
+                                
preImageObj.get(columnFamily).put(columnQualifier,
+                                        this.getColumnValue(cell, 
cdcColumnInfoList
+                                                
.get(columnListIndex).getColumnType()));
+                            } else if (cell.getTimestamp() == indexCellTS) {
+                                if (!changeImageObj.containsKey(columnFamily)) 
{
+                                    changeImageObj.put(columnFamily, new 
HashMap<>());
+                                }
+                                
changeImageObj.get(columnFamily).put(columnQualifier,
+                                        this.getColumnValue(cell, 
cdcColumnInfoList
+                                                
.get(columnListIndex).getColumnType()));

Review Comment:
   Also, we could avoid building change image if only PRE is selected.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+            Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+            Long lowerBoundForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        if (columnListIndex > 0) {
+                            continue;
+                        }
+                        if (indexCellTS == cell.getTimestamp()) {
+                            isIndexCellDeleteRow = true;
+                        } else if (indexCellTS > cell.getTimestamp()) {
+                            lowerBoundForPreImage = cell.getTimestamp();
+                        }
+                    } else if (cell.getType() == Cell.Type.DeleteColumn
+                            || cell.getType() == Cell.Type.Put) {
+                        if (!Arrays.equals(cell.getQualifierArray(), emptyCQ)
+                                && CDCUtil.compareCellFamilyAndQualifier(
+                                        cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                        
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                        cdcColumnInfoList.get(columnListIndex)
+                                                .getColumnQualifier()) > 0) {
+                            while (columnListIndex < cdcColumnInfoList.size()
+                                    && CDCUtil.compareCellFamilyAndQualifier(
+                                    cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnQualifier()) > 0) {
+                                columnListIndex += 1;
                             }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
+                            if (columnListIndex >= cdcColumnInfoList.size()) {
+                                break;
                             }
                         }
-                        if (currentColumn != null) {
-                            columns.add(currentColumn);
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) < 0) {
+                            continue;
                         }
-                        List<Long> sortedTimestamps = 
uniqueTimeStamps.stream().sorted().collect(
-                                Collectors.toList());
-                        // FIXME: Does this need to be Concurrent?
-                        Map<ImmutableBytesPtr, Cell> rollingRow = new 
HashMap<>();
-                        int[] columnPointers = new int[columns.size()];
-                        changeTimeline = new TreeMap<>();
-                        dataRowChanges.put(dataRowKey, changeTimeline);
-                        for (Long ts : sortedTimestamps) {
-                            for (int i = 0; i < columns.size(); ++i) {
-                                Cell cell = 
columns.get(i).get(columnPointers[i]);
-                                if (cell.getTimestamp() == ts) {
-                                    rollingRow.put(new ImmutableBytesPtr(
-                                                    cell.getQualifierArray(),
-                                                    cell.getQualifierOffset(),
-                                                    cell.getQualifierLength()),
-                                            cell);
-                                    ++columnPointers[i];
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) == 0) {
+                            String columnFamily = StandardCharsets.UTF_8
+                                    
.decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnFamily())).toString();
+                            String columnQualifier = 
cdcColumnInfoList.get(columnListIndex)
+                                    .getColumnName();
+                            if (Arrays.equals(
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    
cdcDataTableInfo.getDefaultColumnFamily())) {
+                                columnFamily = DEFAULT_COLUMN_FAMILY_STR;
+                            }
+                            if (cell.getTimestamp() < indexCellTS
+                                    && cell.getTimestamp() > 
lowerBoundForPreImage) {
+                                if (!preImageObj.containsKey(columnFamily)) {
+                                    preImageObj.put(columnFamily, new 
HashMap<>());
+                                }
+                                if 
(preImageObj.get(columnFamily).containsKey(columnQualifier)) {
+                                    continue;
                                 }
+                                
preImageObj.get(columnFamily).put(columnQualifier,
+                                        this.getColumnValue(cell, 
cdcColumnInfoList
+                                                
.get(columnListIndex).getColumnType()));
+                            } else if (cell.getTimestamp() == indexCellTS) {
+                                if (!changeImageObj.containsKey(columnFamily)) 
{
+                                    changeImageObj.put(columnFamily, new 
HashMap<>());
+                                }
+                                
changeImageObj.get(columnFamily).put(columnQualifier,
+                                        this.getColumnValue(cell, 
cdcColumnInfoList
+                                                
.get(columnListIndex).getColumnType()));

Review Comment:
   This too is a repetition of lines from 198 to 200, and can be deduped if you 
make the map conditional (i.e., < indexCellTS use preImage map and == 
indexCellTS use changeImage map).



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+            Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+            Long lowerBoundForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        if (columnListIndex > 0) {
+                            continue;
+                        }
+                        if (indexCellTS == cell.getTimestamp()) {
+                            isIndexCellDeleteRow = true;
+                        } else if (indexCellTS > cell.getTimestamp()) {
+                            lowerBoundForPreImage = cell.getTimestamp();
+                        }
+                    } else if (cell.getType() == Cell.Type.DeleteColumn
+                            || cell.getType() == Cell.Type.Put) {
+                        if (!Arrays.equals(cell.getQualifierArray(), emptyCQ)
+                                && CDCUtil.compareCellFamilyAndQualifier(
+                                        cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                        
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                        cdcColumnInfoList.get(columnListIndex)
+                                                .getColumnQualifier()) > 0) {
+                            while (columnListIndex < cdcColumnInfoList.size()
+                                    && CDCUtil.compareCellFamilyAndQualifier(
+                                    cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnQualifier()) > 0) {
+                                columnListIndex += 1;
                             }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
+                            if (columnListIndex >= cdcColumnInfoList.size()) {
+                                break;
                             }
                         }
-                        if (currentColumn != null) {
-                            columns.add(currentColumn);
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) < 0) {
+                            continue;
                         }
-                        List<Long> sortedTimestamps = 
uniqueTimeStamps.stream().sorted().collect(
-                                Collectors.toList());
-                        // FIXME: Does this need to be Concurrent?
-                        Map<ImmutableBytesPtr, Cell> rollingRow = new 
HashMap<>();
-                        int[] columnPointers = new int[columns.size()];
-                        changeTimeline = new TreeMap<>();
-                        dataRowChanges.put(dataRowKey, changeTimeline);
-                        for (Long ts : sortedTimestamps) {
-                            for (int i = 0; i < columns.size(); ++i) {
-                                Cell cell = 
columns.get(i).get(columnPointers[i]);
-                                if (cell.getTimestamp() == ts) {
-                                    rollingRow.put(new ImmutableBytesPtr(
-                                                    cell.getQualifierArray(),
-                                                    cell.getQualifierOffset(),
-                                                    cell.getQualifierLength()),
-                                            cell);
-                                    ++columnPointers[i];
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) == 0) {
+                            String columnFamily = StandardCharsets.UTF_8
+                                    
.decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnFamily())).toString();
+                            String columnQualifier = 
cdcColumnInfoList.get(columnListIndex)
+                                    .getColumnName();
+                            if (Arrays.equals(
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    
cdcDataTableInfo.getDefaultColumnFamily())) {
+                                columnFamily = DEFAULT_COLUMN_FAMILY_STR;
+                            }
+                            if (cell.getTimestamp() < indexCellTS
+                                    && cell.getTimestamp() > 
lowerBoundForPreImage) {
+                                if (!preImageObj.containsKey(columnFamily)) {
+                                    preImageObj.put(columnFamily, new 
HashMap<>());
+                                }
+                                if 
(preImageObj.get(columnFamily).containsKey(columnQualifier)) {
+                                    continue;
                                 }
+                                
preImageObj.get(columnFamily).put(columnQualifier,
+                                        this.getColumnValue(cell, 
cdcColumnInfoList
+                                                
.get(columnListIndex).getColumnType()));
+                            } else if (cell.getTimestamp() == indexCellTS) {
+                                if (!changeImageObj.containsKey(columnFamily)) 
{
+                                    changeImageObj.put(columnFamily, new 
HashMap<>());
+                                }

Review Comment:
   This is a repetition of lines 192 to 194.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +84,148 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();

Review Comment:
   This makes me wonder if making  `indexToDataRowKeyMap` also a `Map< 
ImmutableBytesPtr,ImmutableBytesPtr>`, it could avoid some copies and make it 
more efficient. This is something we should explore offline.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+            Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+            Long lowerBoundForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        if (columnListIndex > 0) {
+                            continue;
+                        }
+                        if (indexCellTS == cell.getTimestamp()) {
+                            isIndexCellDeleteRow = true;
+                        } else if (indexCellTS > cell.getTimestamp()) {
+                            lowerBoundForPreImage = cell.getTimestamp();
+                        }
+                    } else if (cell.getType() == Cell.Type.DeleteColumn
+                            || cell.getType() == Cell.Type.Put) {
+                        if (!Arrays.equals(cell.getQualifierArray(), emptyCQ)
+                                && CDCUtil.compareCellFamilyAndQualifier(
+                                        cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                        
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                        cdcColumnInfoList.get(columnListIndex)
+                                                .getColumnQualifier()) > 0) {
+                            while (columnListIndex < cdcColumnInfoList.size()
+                                    && CDCUtil.compareCellFamilyAndQualifier(
+                                    cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnQualifier()) > 0) {
+                                columnListIndex += 1;
                             }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
+                            if (columnListIndex >= cdcColumnInfoList.size()) {
+                                break;
                             }
                         }
-                        if (currentColumn != null) {
-                            columns.add(currentColumn);
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) < 0) {
+                            continue;
                         }
-                        List<Long> sortedTimestamps = 
uniqueTimeStamps.stream().sorted().collect(
-                                Collectors.toList());
-                        // FIXME: Does this need to be Concurrent?
-                        Map<ImmutableBytesPtr, Cell> rollingRow = new 
HashMap<>();
-                        int[] columnPointers = new int[columns.size()];
-                        changeTimeline = new TreeMap<>();
-                        dataRowChanges.put(dataRowKey, changeTimeline);
-                        for (Long ts : sortedTimestamps) {
-                            for (int i = 0; i < columns.size(); ++i) {
-                                Cell cell = 
columns.get(i).get(columnPointers[i]);
-                                if (cell.getTimestamp() == ts) {
-                                    rollingRow.put(new ImmutableBytesPtr(
-                                                    cell.getQualifierArray(),
-                                                    cell.getQualifierOffset(),
-                                                    cell.getQualifierLength()),
-                                            cell);
-                                    ++columnPointers[i];
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) == 0) {

Review Comment:
   Another repetition of this call that I think can be avoided if the above 
logic assigns the return value to a local variable.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +237,85 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<String, Map<String, Object>> preImageObj,
+            Map<String, Map<String, Object>> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            rowValueMap.put(PRE_IMAGE, preImageObj);
+        }
+
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            rowValueMap.put(CHANGE_IMAGE, changeImageObj);
+        }
+
+        Map<String, Map<String, Object>> postImageObj = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+            if (!isIndexCellDeleteRow) {
+                for (Map.Entry<String, Map<String, Object>> preImageObjFamily
+                        : preImageObj.entrySet()) {
+                    String columnFamily = preImageObjFamily.getKey();
+                    postImageObj.put(columnFamily, new HashMap<>());
+                    for (Map.Entry<String, Object> preImageColQual :
+                            preImageObjFamily.getValue().entrySet()) {
+                        
postImageObj.get(columnFamily).put(preImageColQual.getKey(),
+                                preImageColQual.getValue());

Review Comment:
   We should be able to just clone the original map.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+            Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+            Long lowerBoundForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {

Review Comment:
   Don't we also need to match the CF here?





> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
>                 Key: PHOENIX-7015
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7015
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Viraj Jasani
>            Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to 
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs 
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row 
> including pre and/or post row images.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to