[ 
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808143#comment-17808143
 ] 

ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------

haridsv commented on code in PR #1794:
URL: https://github.com/apache/phoenix/pull/1794#discussion_r1457249404


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {

Review Comment:
   ```suggestion
                   if 
(dataColQualNameMap.containsKey(preImageObjCell.getKey())) {
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                    : changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+            if (!isIndexCellDeleteRow) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+                        : preImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(preImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                        : changeImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }
+            rowValueMap.put(POST_IMAGE, postImage);
+        }
+
+        if (isIndexCellDeleteRow) {
+            rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+        } else {
+            rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+        }
+
+        byte[] value =
+                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+        Result cdcRow = Result.create(Arrays.asList(builder.
+                setRow(indexToDataRowKeyMap.get(new 
ImmutableBytesPtr(firstCell.getRowArray(),
+                        firstCell.getRowOffset(), firstCell.getRowLength())
+                        .copyBytesIfNecessary())).
+                setFamily(firstCell.getFamilyArray()).
+                setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)).
+                setTimestamp(indexCellTS).
+                setValue(value).
+                setType(Cell.Type.Put).
+                build()));
+
+        return cdcRow;
+    }
+
+    @Override
+    protected void scanDataTableRows(long startTime) throws IOException {
+        super.scanDataTableRows(startTime);
+        List<List<Cell>> indexRowList = new ArrayList<>();
+        // Creating new Index Rows for Delete Row events
+        for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+            List<Cell> indexRow = indexRows.get(rowIndex);
+            indexRowList.add(indexRow);
+            if (indexRow.size() > 1) {
+                List<Cell> deleteRow = null;
+                for (int cellIndex = indexRow.size() - 1; cellIndex >= 0; 
cellIndex--) {
+                    Cell cell = indexRow.get(cellIndex);
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        byte[] indexRowKey = new 
ImmutableBytesPtr(cell.getRowArray(),
+                                cell.getRowOffset(), cell.getRowLength())
+                                .copyBytesIfNecessary();
+                        ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                                indexToDataRowKeyMap.get(indexRowKey));
+                        Result dataRow = dataRows.get(dataRowKey);
+                        for (Cell dataRowCell : dataRow.rawCells()) {
+                            // Note: Upsert adds delete family marker in the 
index table but not in the datatable.
+                            // Delete operation adds delete family marker in 
datatable as well as index table.
+                            if (dataRowCell.getType() == Cell.Type.DeleteFamily
+                                    && dataRowCell.getTimestamp() == 
cell.getTimestamp()) {
+                                if (deleteRow == null) {
+                                    deleteRow = new ArrayList<>();
+                                }
+                                deleteRow.add(cell);
+                                indexRowList.add(deleteRow);

Review Comment:
   If you change the deleteRow to be a boolean flag, this can just be:
   ```suggestion
                                   indexRowList.add(Arrays.asList(cell));
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                    : changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {

Review Comment:
   ```suggestion
                   if 
(dataColQualNameMap.containsKey(changeImageObjCell.getKey())) {
   ```



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                    : changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+            if (!isIndexCellDeleteRow) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+                        : preImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(preImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                        : changeImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }

Review Comment:
   Instead of using a separate loop for for building postImage, why not do it 
as part of both the pre and change image? I mean, you would repeat the lines 
201 and 215 for postImage as well.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<ImmutableBytesPtr, Cell> preImageObj,
+            Map<ImmutableBytesPtr, Cell> changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        Map<String, Object> preImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : 
preImageObj.entrySet()) {
+                if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+                    
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                    
preImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(PRE_IMAGE, preImage);
+        }
+
+        Map<String, Object> changeImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+            for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                    : changeImageObj.entrySet()) {
+                if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                    
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                            
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                    
changeImageObjCell.getValue().getValueArray()));
+                }
+            }
+            rowValueMap.put(CHANGE_IMAGE, changeImage);
+        }
+
+        Map<String, Object> postImage = new HashMap<>();
+        if (this.cdcChangeScopeSet.size() == 0
+                || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+            if (!isIndexCellDeleteRow) {
+                for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+                        : preImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(preImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+                                        
preImageObjCell.getValue().getValueArray()));
+                    }
+                }
+                for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+                        : changeImageObj.entrySet()) {
+                    if (dataColQualNameMap.get(changeImageObjCell.getKey()) != 
null) {
+                        
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+                                
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+                                        
changeImageObjCell.getValue().getValueArray()));
+                    }
+                }
+            }
+            rowValueMap.put(POST_IMAGE, postImage);
+        }
+
+        if (isIndexCellDeleteRow) {
+            rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+        } else {
+            rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+        }
+
+        byte[] value =
+                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+        Result cdcRow = Result.create(Arrays.asList(builder.
+                setRow(indexToDataRowKeyMap.get(new 
ImmutableBytesPtr(firstCell.getRowArray(),
+                        firstCell.getRowOffset(), firstCell.getRowLength())
+                        .copyBytesIfNecessary())).
+                setFamily(firstCell.getFamilyArray()).
+                setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)).
+                setTimestamp(indexCellTS).
+                setValue(value).
+                setType(Cell.Type.Put).
+                build()));
+
+        return cdcRow;
+    }
+
+    @Override
+    protected void scanDataTableRows(long startTime) throws IOException {
+        super.scanDataTableRows(startTime);
+        List<List<Cell>> indexRowList = new ArrayList<>();
+        // Creating new Index Rows for Delete Row events
+        for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+            List<Cell> indexRow = indexRows.get(rowIndex);
+            indexRowList.add(indexRow);
+            if (indexRow.size() > 1) {
+                List<Cell> deleteRow = null;

Review Comment:
   Nit: Outside the loop you are just using this as a flag, so why not use a 
boolean flag here and make use of a local list at 294?





> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
>                 Key: PHOENIX-7015
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7015
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Viraj Jasani
>            Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to 
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs 
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row 
> including pre and/or post row images.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to