[ https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808143#comment-17808143 ]
ASF GitHub Bot commented on PHOENIX-7015: ----------------------------------------- haridsv commented on code in PR #1794: URL: https://github.com/apache/phoenix/pull/1794#discussion_r1457249404 ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { Review Comment: ```suggestion if (dataColQualNameMap.containsKey(preImageObjCell.getKey())) { ``` ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(PRE_IMAGE, preImage); + } + + Map<String, Object> changeImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(CHANGE_IMAGE, changeImage); + } + + Map<String, Object> postImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) { + if (!isIndexCellDeleteRow) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell + : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + } + rowValueMap.put(POST_IMAGE, postImage); + } + + if (isIndexCellDeleteRow) { + rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE); + } else { + rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE); + } + + byte[] value = + new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8); + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); + Result cdcRow = Result.create(Arrays.asList(builder. + setRow(indexToDataRowKeyMap.get(new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary())). + setFamily(firstCell.getFamilyArray()). + setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)). + setTimestamp(indexCellTS). + setValue(value). + setType(Cell.Type.Put). + build())); + + return cdcRow; + } + + @Override + protected void scanDataTableRows(long startTime) throws IOException { + super.scanDataTableRows(startTime); + List<List<Cell>> indexRowList = new ArrayList<>(); + // Creating new Index Rows for Delete Row events + for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) { + List<Cell> indexRow = indexRows.get(rowIndex); + indexRowList.add(indexRow); + if (indexRow.size() > 1) { + List<Cell> deleteRow = null; + for (int cellIndex = indexRow.size() - 1; cellIndex >= 0; cellIndex--) { + Cell cell = indexRow.get(cellIndex); + if (cell.getType() == Cell.Type.DeleteFamily) { + byte[] indexRowKey = new ImmutableBytesPtr(cell.getRowArray(), + cell.getRowOffset(), cell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + for (Cell dataRowCell : dataRow.rawCells()) { + // Note: Upsert adds delete family marker in the index table but not in the datatable. + // Delete operation adds delete family marker in datatable as well as index table. + if (dataRowCell.getType() == Cell.Type.DeleteFamily + && dataRowCell.getTimestamp() == cell.getTimestamp()) { + if (deleteRow == null) { + deleteRow = new ArrayList<>(); + } + deleteRow.add(cell); + indexRowList.add(deleteRow); Review Comment: If you change the deleteRow to be a boolean flag, this can just be: ```suggestion indexRowList.add(Arrays.asList(cell)); ``` ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(PRE_IMAGE, preImage); + } + + Map<String, Object> changeImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { Review Comment: ```suggestion if (dataColQualNameMap.containsKey(changeImageObjCell.getKey())) { ``` ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(PRE_IMAGE, preImage); + } + + Map<String, Object> changeImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(CHANGE_IMAGE, changeImage); + } + + Map<String, Object> postImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) { + if (!isIndexCellDeleteRow) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell + : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + } Review Comment: Instead of using a separate loop for for building postImage, why not do it as part of both the pre and change image? I mean, you would repeat the lines 201 and 215 for postImage as well. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<ImmutableBytesPtr, Cell> preImageObj, + Map<ImmutableBytesPtr, Cell> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) { + Map<String, Object> rowValueMap = new HashMap<>(); + + Map<String, Object> preImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(PRE_IMAGE, preImage); + } + + Map<String, Object> changeImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) { + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + rowValueMap.put(CHANGE_IMAGE, changeImage); + } + + Map<String, Object> postImage = new HashMap<>(); + if (this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) { + if (!isIndexCellDeleteRow) { + for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell + : preImageObj.entrySet()) { + if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()), + dataColQualTypeMap.get(preImageObjCell.getKey()).toObject( + preImageObjCell.getValue().getValueArray())); + } + } + for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell + : changeImageObj.entrySet()) { + if (dataColQualNameMap.get(changeImageObjCell.getKey()) != null) { + postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()), + dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject( + changeImageObjCell.getValue().getValueArray())); + } + } + } + rowValueMap.put(POST_IMAGE, postImage); + } + + if (isIndexCellDeleteRow) { + rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE); + } else { + rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE); + } + + byte[] value = + new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8); + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); + Result cdcRow = Result.create(Arrays.asList(builder. + setRow(indexToDataRowKeyMap.get(new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary())). + setFamily(firstCell.getFamilyArray()). + setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)). + setTimestamp(indexCellTS). + setValue(value). + setType(Cell.Type.Put). + build())); + + return cdcRow; + } + + @Override + protected void scanDataTableRows(long startTime) throws IOException { + super.scanDataTableRows(startTime); + List<List<Cell>> indexRowList = new ArrayList<>(); + // Creating new Index Rows for Delete Row events + for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) { + List<Cell> indexRow = indexRows.get(rowIndex); + indexRowList.add(indexRow); + if (indexRow.size() > 1) { + List<Cell> deleteRow = null; Review Comment: Nit: Outside the loop you are just using this as a flag, so why not use a boolean flag here and make use of a local list at 294? > Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase > ----------------------------------------------------------------------- > > Key: PHOENIX-7015 > URL: https://issues.apache.org/jira/browse/PHOENIX-7015 > Project: Phoenix > Issue Type: Sub-task > Reporter: Viraj Jasani > Priority: Major > > For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to > CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs > raw scan to index table and retrieve data table rows from index rows. > Using the time range, it can form a JSON blob to represent changes to the row > including pre and/or post row images. -- This message was sent by Atlassian Jira (v8.20.10#820010)