[ 
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808030#comment-17808030
 ] 

ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------

TheNamesRai commented on code in PR #1794:
URL: https://github.com/apache/phoenix/pull/1794#discussion_r1456962157


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +172,120 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage (
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTs, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0 ||
+                (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))){
+            if (isIndexCellDeleteRow == false) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : 
changeImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                    if(dataColQualNameMap.get(preImageObjCell.getKey()) != null
+                            && 
postImage.get(dataColQualNameMap.get(preImageObjCell.getKey())) == null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }
+            rowValueMap.put(POST_IMAGE, postImage);
+        }
+
+
+        if (isIndexCellDeleteRow) {
+            rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+        } else {
+            rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+        }
+
+        byte[] value =
+                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+        Result cdcRow = Result.create(Arrays.asList(builder.
+                setRow(indexToDataRowKeyMap.get(new 
ImmutableBytesPtr(firstCell.getRowArray(),
+                        firstCell.getRowOffset(), firstCell.getRowLength())
+                        .copyBytesIfNecessary())).
+                setFamily(firstCell.getFamilyArray()).
+                setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
+                setTimestamp(indexCellTs).
+                setValue(value).
+                setType(Cell.Type.Put).
+                build()));
+
+        return cdcRow;
+    }
+
+    @Override
+    protected void scanDataTableRows(long startTime) throws IOException {
+        super.scanDataTableRows(startTime);
+        List<List<Cell>> indexRowList = new ArrayList<>();
+        // Creating new Index Rows for Delete Row events
+        for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+            List<Cell> indexRow = indexRows.get(rowIndex);
+            indexRowList.add(indexRow);
+            if (indexRow.size() > 1) {
+                List<Cell> deleteRow = new ArrayList<>();
+                for (int cellIndex = indexRow.size() - 1; cellIndex >= 0; 
cellIndex--) {
+                    Cell cell = indexRow.get(cellIndex);
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        byte[] indexRowKey = new 
ImmutableBytesPtr(cell.getRowArray(),
+                                cell.getRowOffset(), cell.getRowLength())
+                                .copyBytesIfNecessary();
+                        ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                                indexToDataRowKeyMap.get(indexRowKey));
+                        Result dataRow = dataRows.get(dataRowKey);
+                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
+                        for (Cell dataRowCell : resultCells) {
+                            if (dataRowCell.getType() == 
Cell.Type.DeleteFamily && dataRowCell.getTimestamp() == cell.getTimestamp()) {

Review Comment:
   Yes, that is a correct understanding. Will add the comment as well.





> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
>                 Key: PHOENIX-7015
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7015
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Viraj Jasani
>            Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to 
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs 
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row 
> including pre and/or post row images.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to