[ 
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807366#comment-17807366
 ] 

ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------

haridsv commented on code in PR #1794:
URL: https://github.com/apache/phoenix/pull/1794#discussion_r1452101150


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -104,115 +94,74 @@ protected Scan prepareDataTableScan(Collection<byte[]> 
dataRowKeys) throws IOExc
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTs = firstCell.getTimestamp();
+            Cell.Type indexCellType = firstCell.getType();
+
+            Map<ImmutableBytesPtr, Cell> preImageObj = new HashMap<>();
+            Map<ImmutableBytesPtr, Cell> changeImageObj = new HashMap<>();
+            List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
+            Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
+
+            boolean isIndexCellDeleteRow = false;
+            boolean isIndexCellDeleteColumn = false;
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
-                            }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
-                            }
+                for (Cell cell : resultCells) {
+                    if (cell.getType() == Cell.Type.DeleteColumn) {
+                        // DDL is not supported in CDC
+                        if (cell.getTimestamp() == indexCellTs) {
+                            isIndexCellDeleteColumn = true;
+                            break;

Review Comment:
   You can have multiple `DeleteColumn` cells corresponding to different 
columns in the same UPSERT.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -104,115 +94,74 @@ protected Scan prepareDataTableScan(Collection<byte[]> 
dataRowKeys) throws IOExc
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTs = firstCell.getTimestamp();

Review Comment:
   nit: `indexCellTs` reads like plurality, but since TS is an acronym, it I 
suggest using `indexCellTS` instead.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -104,115 +94,74 @@ protected Scan prepareDataTableScan(Collection<byte[]> 
dataRowKeys) throws IOExc
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTs = firstCell.getTimestamp();
+            Cell.Type indexCellType = firstCell.getType();
+
+            Map<ImmutableBytesPtr, Cell> preImageObj = new HashMap<>();
+            Map<ImmutableBytesPtr, Cell> changeImageObj = new HashMap<>();
+            List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
+            Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
+
+            boolean isIndexCellDeleteRow = false;
+            boolean isIndexCellDeleteColumn = false;
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
-                            }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
-                            }
+                for (Cell cell : resultCells) {
+                    if (cell.getType() == Cell.Type.DeleteColumn) {
+                        // DDL is not supported in CDC

Review Comment:
   Does this comment imply that whenever type is `DeleteColumn`, it is a DDL? 
It actually isn't right? Even an UPSERT with a NULL value will cause a 
DeleteColumn for the corresponding column. There will be a DeleteFamily in the 
index table for the corresponding timestamp so it is better do a cross check.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +172,120 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage (
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTs, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))){
+            if (isIndexCellDeleteRow == false) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(preImageObjCell.getKey()) != null
+                            && 
postImage.get(dataColQualNameMap.get(preImageObjCell.getKey())) == null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }
+            rowValueMap.put(POST_IMAGE, postImage);
+        }
+
+
+        if (isIndexCellDeleteRow) {
+            rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+        } else {
+            rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+        }
+
+        byte[] value =
+                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+        Result cdcRow = Result.create(Arrays.asList(builder.
+                setRow(indexToDataRowKeyMap.get(new 
ImmutableBytesPtr(firstCell.getRowArray(),
+                        firstCell.getRowOffset(), firstCell.getRowLength())
+                        .copyBytesIfNecessary())).
+                setFamily(firstCell.getFamilyArray()).
+                setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
+                setTimestamp(indexCellTs).
+                setValue(value).
+                setType(Cell.Type.Put).
+                build()));
+
+        return cdcRow;
+    }
+
+    @Override
+    protected void scanDataTableRows(long startTime) throws IOException {
+        super.scanDataTableRows(startTime);
+        List<List<Cell>> indexRowList = new ArrayList<>();
+        // Creating new Index Rows for Delete Row events
+        for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+            List<Cell> indexRow = indexRows.get(rowIndex);
+            indexRowList.add(indexRow);
+            if (indexRow.size() > 1) {
+                List<Cell> deleteRow = new ArrayList<>();
+                for (int cellIndex = indexRow.size() - 1; cellIndex >= 0; 
cellIndex--) {
+                    Cell cell = indexRow.get(cellIndex);
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        byte[] indexRowKey = new 
ImmutableBytesPtr(cell.getRowArray(),
+                                cell.getRowOffset(), cell.getRowLength())
+                                .copyBytesIfNecessary();
+                        ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                                indexToDataRowKeyMap.get(indexRowKey));
+                        Result dataRow = dataRows.get(dataRowKey);
+                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
+                        for (Cell dataRowCell : resultCells) {

Review Comment:
   You don't need this temp list, right?
   ```suggestion
                           for (Cell dataRowCell : dataRow.rawCells()) {
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -104,115 +94,74 @@ protected Scan prepareDataTableScan(Collection<byte[]> 
dataRowKeys) throws IOExc
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTs = firstCell.getTimestamp();

Review Comment:
   When a user specified TS column is specified for CDC, this won't be the TS 
of the user specified column. However, I am not 100% clear on what needs to be 
done in this case, we should discuss and come up with a better understanding of 
this scenario offline.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -73,8 +54,15 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
     private Map<ImmutableBytesPtr, String> dataColQualNameMap;
     private Map<ImmutableBytesPtr, PDataType> dataColQualTypeMap;
     // Map<dataRowKey: Map<TS: Map<qualifier: Cell>>>
-    private Map<ImmutableBytesPtr, Map<Long, Map<ImmutableBytesPtr, Cell>>> 
dataRowChanges =
-            new HashMap<>();
+    private Set<PTable.CDCChangeScope> cdcChangeScopeSet;
+
+
+    private final static String EVENT_TYPE = "event_type";
+    private final static String PRE_IMAGE = "pre_image";
+    private final static String POST_IMAGE = "post_image";
+    private final static String CHANGE_IMAGE = "change_image";
+    private final static String UPSERT_EVENT_TYPE = "upsert";
+    private final static String DELETE_EVENT_TYPE = "delete";

Review Comment:
   These can go into `QueryConstants` as public constants so that the clients 
can reuse them for the sake of decoding the JSON structure.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +172,120 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage (
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTs, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))){
+            if (isIndexCellDeleteRow == false) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(preImageObjCell.getKey()) != null
+                            && 
postImage.get(dataColQualNameMap.get(preImageObjCell.getKey())) == null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }
+            rowValueMap.put(POST_IMAGE, postImage);
+        }
+
+
+        if (isIndexCellDeleteRow) {
+            rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+        } else {
+            rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+        }
+
+        byte[] value =
+                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+        Result cdcRow = Result.create(Arrays.asList(builder.
+                setRow(indexToDataRowKeyMap.get(new 
ImmutableBytesPtr(firstCell.getRowArray(),
+                        firstCell.getRowOffset(), firstCell.getRowLength())
+                        .copyBytesIfNecessary())).
+                setFamily(firstCell.getFamilyArray()).
+                setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
+                setTimestamp(indexCellTs).
+                setValue(value).
+                setType(Cell.Type.Put).
+                build()));
+
+        return cdcRow;
+    }
+
+    @Override
+    protected void scanDataTableRows(long startTime) throws IOException {
+        super.scanDataTableRows(startTime);
+        List<List<Cell>> indexRowList = new ArrayList<>();
+        // Creating new Index Rows for Delete Row events
+        for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+            List<Cell> indexRow = indexRows.get(rowIndex);
+            indexRowList.add(indexRow);
+            if (indexRow.size() > 1) {
+                List<Cell> deleteRow = new ArrayList<>();

Review Comment:
   If we initiate this to null and create the list on demand at line 277, it 
will avoid generating large number of empty list as garbage. You would of 
course have to change the size check at line 283 to a null check instead.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +172,120 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage (
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTs, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))){
+            if (isIndexCellDeleteRow == false) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(preImageObjCell.getKey()) != null
+                            && 
postImage.get(dataColQualNameMap.get(preImageObjCell.getKey())) == null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }
+            rowValueMap.put(POST_IMAGE, postImage);
+        }
+
+
+        if (isIndexCellDeleteRow) {
+            rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+        } else {
+            rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+        }
+
+        byte[] value =
+                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+        Result cdcRow = Result.create(Arrays.asList(builder.
+                setRow(indexToDataRowKeyMap.get(new 
ImmutableBytesPtr(firstCell.getRowArray(),
+                        firstCell.getRowOffset(), firstCell.getRowLength())
+                        .copyBytesIfNecessary())).
+                setFamily(firstCell.getFamilyArray()).
+                setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
+                setTimestamp(indexCellTs).
+                setValue(value).
+                setType(Cell.Type.Put).
+                build()));
+
+        return cdcRow;
+    }
+
+    @Override
+    protected void scanDataTableRows(long startTime) throws IOException {
+        super.scanDataTableRows(startTime);
+        List<List<Cell>> indexRowList = new ArrayList<>();
+        // Creating new Index Rows for Delete Row events
+        for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+            List<Cell> indexRow = indexRows.get(rowIndex);
+            indexRowList.add(indexRow);
+            if (indexRow.size() > 1) {
+                List<Cell> deleteRow = new ArrayList<>();
+                for (int cellIndex = indexRow.size() - 1; cellIndex >= 0; 
cellIndex--) {
+                    Cell cell = indexRow.get(cellIndex);
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        byte[] indexRowKey = new 
ImmutableBytesPtr(cell.getRowArray(),
+                                cell.getRowOffset(), cell.getRowLength())
+                                .copyBytesIfNecessary();
+                        ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                                indexToDataRowKeyMap.get(indexRowKey));
+                        Result dataRow = dataRows.get(dataRowKey);
+                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
+                        for (Cell dataRowCell : resultCells) {
+                            if (dataRowCell.getType() == 
Cell.Type.DeleteFamily && dataRowCell.getTimestamp() == cell.getTimestamp()) {

Review Comment:
   Could you add a comment with details on why we need to do this match against 
the data table? I am guessing this is to distinguish an UPSERT vs DELETE, since 
the former also causes a DeleteFamily in the index table while it doesn't in 
the data table, but I would like a confirmation of my understanding and having 
a comment will be helpful as a future reference anyway.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -23,48 +23,29 @@
 import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.CDCUtil;
-import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
-
-import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
-import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP;
-import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP;
+import java.util.*;

Review Comment:
   You need to turn off import * in IDEA , follow the update in this answer: 
https://stackoverflow.com/a/3348855/95750



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -104,115 +94,74 @@ protected Scan prepareDataTableScan(Collection<byte[]> 
dataRowKeys) throws IOExc
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTs = firstCell.getTimestamp();
+            Cell.Type indexCellType = firstCell.getType();
+
+            Map<ImmutableBytesPtr, Cell> preImageObj = new HashMap<>();
+            Map<ImmutableBytesPtr, Cell> changeImageObj = new HashMap<>();
+            List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
+            Collections.sort(resultCells, 
CellComparator.getInstance().reversed());

Review Comment:
   Cells are already sorted, so we can just reverse the list without needing a 
comparator.
   ```suggestion
               Collections.reverse(resultCells);
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +172,120 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage (
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTs, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))){
+            if (isIndexCellDeleteRow == false) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(preImageObjCell.getKey()) != null
+                            && 
postImage.get(dataColQualNameMap.get(preImageObjCell.getKey())) == null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }
+            rowValueMap.put(POST_IMAGE, postImage);
+        }
+
+
+        if (isIndexCellDeleteRow) {
+            rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+        } else {
+            rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+        }
+
+        byte[] value =
+                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+        Result cdcRow = Result.create(Arrays.asList(builder.
+                setRow(indexToDataRowKeyMap.get(new 
ImmutableBytesPtr(firstCell.getRowArray(),
+                        firstCell.getRowOffset(), firstCell.getRowLength())
+                        .copyBytesIfNecessary())).
+                setFamily(firstCell.getFamilyArray()).
+                setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
+                setTimestamp(indexCellTs).
+                setValue(value).
+                setType(Cell.Type.Put).
+                build()));
+
+        return cdcRow;
+    }
+
+    @Override
+    protected void scanDataTableRows(long startTime) throws IOException {
+        super.scanDataTableRows(startTime);
+        List<List<Cell>> indexRowList = new ArrayList<>();
+        // Creating new Index Rows for Delete Row events
+        for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+            List<Cell> indexRow = indexRows.get(rowIndex);
+            indexRowList.add(indexRow);
+            if (indexRow.size() > 1) {
+                List<Cell> deleteRow = new ArrayList<>();
+                for (int cellIndex = indexRow.size() - 1; cellIndex >= 0; 
cellIndex--) {
+                    Cell cell = indexRow.get(cellIndex);
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        byte[] indexRowKey = new 
ImmutableBytesPtr(cell.getRowArray(),
+                                cell.getRowOffset(), cell.getRowLength())
+                                .copyBytesIfNecessary();
+                        ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                                indexToDataRowKeyMap.get(indexRowKey));
+                        Result dataRow = dataRows.get(dataRowKey);
+                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
+                        for (Cell dataRowCell : resultCells) {
+                            if (dataRowCell.getType() == 
Cell.Type.DeleteFamily && dataRowCell.getTimestamp() == cell.getTimestamp()) {

Review Comment:
   To support user specified timestamp column for CDC, we should probably take 
timestamps from index row key and the corresponding column cell in the data 
table cells, but this probably needs a bigger discussion and I suspect will 
have fallout changes in the algorithm as well as the scan attributes needed 
from client.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -104,115 +94,74 @@ protected Scan prepareDataTableScan(Collection<byte[]> 
dataRowKeys) throws IOExc
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTs = firstCell.getTimestamp();
+            Cell.Type indexCellType = firstCell.getType();
+
+            Map<ImmutableBytesPtr, Cell> preImageObj = new HashMap<>();
+            Map<ImmutableBytesPtr, Cell> changeImageObj = new HashMap<>();
+            List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
+            Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
+
+            boolean isIndexCellDeleteRow = false;
+            boolean isIndexCellDeleteColumn = false;
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
-                            }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
-                            }
+                for (Cell cell : resultCells) {
+                    if (cell.getType() == Cell.Type.DeleteColumn) {
+                        // DDL is not supported in CDC
+                        if (cell.getTimestamp() == indexCellTs) {
+                            isIndexCellDeleteColumn = true;
+                            break;
+                        }
+                    } else if (cell.getType() == Cell.Type.Put) {

Review Comment:
   I think we can skip over the Put cells once we are past the indexCellTs, 
correct?



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -104,115 +94,74 @@ protected Scan prepareDataTableScan(Collection<byte[]> 
dataRowKeys) throws IOExc
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTs = firstCell.getTimestamp();
+            Cell.Type indexCellType = firstCell.getType();
+
+            Map<ImmutableBytesPtr, Cell> preImageObj = new HashMap<>();
+            Map<ImmutableBytesPtr, Cell> changeImageObj = new HashMap<>();
+            List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
+            Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
+
+            boolean isIndexCellDeleteRow = false;
+            boolean isIndexCellDeleteColumn = false;
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
-                            }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
-                            }
+                for (Cell cell : resultCells) {
+                    if (cell.getType() == Cell.Type.DeleteColumn) {
+                        // DDL is not supported in CDC
+                        if (cell.getTimestamp() == indexCellTs) {
+                            isIndexCellDeleteColumn = true;
+                            break;
+                        }
+                    } else if (cell.getType() == Cell.Type.Put) {
+                        if (cell.getTimestamp() < indexCellTs) {
+                            preImageObj.put(new ImmutableBytesPtr(
+                                    cell.getQualifierArray(),
+                                    cell.getQualifierOffset(),
+                                    cell.getQualifierLength()), cell);
+                        } else if (cell.getTimestamp() == indexCellTs) {
+                            changeImageObj.put(new ImmutableBytesPtr(
+                                    cell.getQualifierArray(),
+                                    cell.getQualifierOffset(),
+                                    cell.getQualifierLength()), cell);

Review Comment:
   ```suggestion
                           ImmutableBytesPtr colQual = new ImmutableBytesPtr(
                                       cell.getQualifierArray(),
                                       cell.getQualifierOffset(),
                                       cell.getQualifierLength());
                           if (cell.getTimestamp() < indexCellTs) {
                               preImageObj.put(colQual, cell);
                           } else if (cell.getTimestamp() == indexCellTs) {
                               changeImageObj.put(colQual, cell);
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +172,120 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage (
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTs, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))){
+            if (isIndexCellDeleteRow == false) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(preImageObjCell.getKey()) != null
+                            && 
postImage.get(dataColQualNameMap.get(preImageObjCell.getKey())) == null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }

Review Comment:
   If you flip these two, you can simply clone the `preImage` map and overwrite 
it with the `changeMap` which will be both simpler and faster.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -104,115 +94,74 @@ protected Scan prepareDataTableScan(Collection<byte[]> 
dataRowKeys) throws IOExc
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTs = firstCell.getTimestamp();
+            Cell.Type indexCellType = firstCell.getType();
+
+            Map<ImmutableBytesPtr, Cell> preImageObj = new HashMap<>();
+            Map<ImmutableBytesPtr, Cell> changeImageObj = new HashMap<>();
+            List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
+            Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
+
+            boolean isIndexCellDeleteRow = false;
+            boolean isIndexCellDeleteColumn = false;
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
-                            }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
-                            }
+                for (Cell cell : resultCells) {
+                    if (cell.getType() == Cell.Type.DeleteColumn) {
+                        // DDL is not supported in CDC

Review Comment:
   Also, there can be multiple columns set to NULL either in the same or 
multiple UPSERT statements, so we would have to detect and surface these 
changes as is.





> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
>                 Key: PHOENIX-7015
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7015
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Viraj Jasani
>            Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to 
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs 
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row 
> including pre and/or post row images.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to