[ https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808030#comment-17808030 ]
ASF GitHub Bot commented on PHOENIX-7015: ----------------------------------------- TheNamesRai commented on code in PR #1794: URL: https://github.com/apache/phoenix/pull/1794#discussion_r1456962157 ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +172,120 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage ( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTs, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 || + (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(PRE_IMAGE, preImage); + } + + Map<String, Object> changeImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 || + (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(CHANGE_IMAGE, changeImage); + } + + Map<String, Object> postImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 || + (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))){ + if (isIndexCellDeleteRow == false) { + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell : changeImageObj.entrySet()) { + if(dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if(dataColQualNameMap.get(preImageObjCell.getKey()) != null + && postImage.get(dataColQualNameMap.get(preImageObjCell.getKey())) == null) { + postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + } + rowValueMap.put(POST_IMAGE, postImage); + } + + + if (isIndexCellDeleteRow) { + rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE); + } else { + rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE); + } + + byte[] value = + new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8); + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); + Result cdcRow = Result.create(Arrays.asList(builder. + setRow(indexToDataRowKeyMap.get(new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary())). + setFamily(firstCell.getFamilyArray()). + setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))). + setTimestamp(indexCellTs). + setValue(value). + setType(Cell.Type.Put). + build())); + + return cdcRow; + } + + @Override + protected void scanDataTableRows(long startTime) throws IOException { + super.scanDataTableRows(startTime); + List<List<Cell>> indexRowList = new ArrayList<>(); + // Creating new Index Rows for Delete Row events + for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) { + List<Cell> indexRow = indexRows.get(rowIndex); + indexRowList.add(indexRow); + if (indexRow.size() > 1) { + List<Cell> deleteRow = new ArrayList<>(); + for (int cellIndex = indexRow.size() - 1; cellIndex >= 0; cellIndex--) { + Cell cell = indexRow.get(cellIndex); + if (cell.getType() == Cell.Type.DeleteFamily) { + byte[] indexRowKey = new ImmutableBytesPtr(cell.getRowArray(), + cell.getRowOffset(), cell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); + for (Cell dataRowCell : resultCells) { + if (dataRowCell.getType() == Cell.Type.DeleteFamily && dataRowCell.getTimestamp() == cell.getTimestamp()) { Review Comment: Yes, that is a correct understanding. Will add the comment as well. > Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase > ----------------------------------------------------------------------- > > Key: PHOENIX-7015 > URL: https://issues.apache.org/jira/browse/PHOENIX-7015 > Project: Phoenix > Issue Type: Sub-task > Reporter: Viraj Jasani > Priority: Major > > For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to > CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs > raw scan to index table and retrieve data table rows from index rows. > Using the time range, it can form a JSON blob to represent changes to the row > including pre and/or post row images. -- This message was sent by Atlassian Jira (v8.20.10#820010)