[
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808143#comment-17808143
]
ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------
haridsv commented on code in PR #1794:
URL: https://github.com/apache/phoenix/pull/1794#discussion_r1457249404
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<ImmutableBytesPtr, Cell> preImageObj,
+ Map<ImmutableBytesPtr, Cell> changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ Map<String, Object> preImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell :
preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
Review Comment:
```suggestion
if
(dataColQualNameMap.containsKey(preImageObjCell.getKey())) {
```
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<ImmutableBytesPtr, Cell> preImageObj,
+ Map<ImmutableBytesPtr, Cell> changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ Map<String, Object> preImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell :
preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(PRE_IMAGE, preImage);
+ }
+
+ Map<String, Object> changeImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(CHANGE_IMAGE, changeImage);
+ }
+
+ Map<String, Object> postImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+ if (!isIndexCellDeleteRow) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+ : preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ }
+ rowValueMap.put(POST_IMAGE, postImage);
+ }
+
+ if (isIndexCellDeleteRow) {
+ rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+ } else {
+ rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+ }
+
+ byte[] value =
+ new
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+ CellBuilder builder =
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+ Result cdcRow = Result.create(Arrays.asList(builder.
+ setRow(indexToDataRowKeyMap.get(new
ImmutableBytesPtr(firstCell.getRowArray(),
+ firstCell.getRowOffset(), firstCell.getRowLength())
+ .copyBytesIfNecessary())).
+ setFamily(firstCell.getFamilyArray()).
+ setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)).
+ setTimestamp(indexCellTS).
+ setValue(value).
+ setType(Cell.Type.Put).
+ build()));
+
+ return cdcRow;
+ }
+
+ @Override
+ protected void scanDataTableRows(long startTime) throws IOException {
+ super.scanDataTableRows(startTime);
+ List<List<Cell>> indexRowList = new ArrayList<>();
+ // Creating new Index Rows for Delete Row events
+ for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+ List<Cell> indexRow = indexRows.get(rowIndex);
+ indexRowList.add(indexRow);
+ if (indexRow.size() > 1) {
+ List<Cell> deleteRow = null;
+ for (int cellIndex = indexRow.size() - 1; cellIndex >= 0;
cellIndex--) {
+ Cell cell = indexRow.get(cellIndex);
+ if (cell.getType() == Cell.Type.DeleteFamily) {
+ byte[] indexRowKey = new
ImmutableBytesPtr(cell.getRowArray(),
+ cell.getRowOffset(), cell.getRowLength())
+ .copyBytesIfNecessary();
+ ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+ indexToDataRowKeyMap.get(indexRowKey));
+ Result dataRow = dataRows.get(dataRowKey);
+ for (Cell dataRowCell : dataRow.rawCells()) {
+ // Note: Upsert adds delete family marker in the
index table but not in the datatable.
+ // Delete operation adds delete family marker in
datatable as well as index table.
+ if (dataRowCell.getType() == Cell.Type.DeleteFamily
+ && dataRowCell.getTimestamp() ==
cell.getTimestamp()) {
+ if (deleteRow == null) {
+ deleteRow = new ArrayList<>();
+ }
+ deleteRow.add(cell);
+ indexRowList.add(deleteRow);
Review Comment:
If you change the deleteRow to be a boolean flag, this can just be:
```suggestion
indexRowList.add(Arrays.asList(cell));
```
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<ImmutableBytesPtr, Cell> preImageObj,
+ Map<ImmutableBytesPtr, Cell> changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ Map<String, Object> preImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell :
preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(PRE_IMAGE, preImage);
+ }
+
+ Map<String, Object> changeImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
Review Comment:
```suggestion
if
(dataColQualNameMap.containsKey(changeImageObjCell.getKey())) {
```
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<ImmutableBytesPtr, Cell> preImageObj,
+ Map<ImmutableBytesPtr, Cell> changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ Map<String, Object> preImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell :
preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(PRE_IMAGE, preImage);
+ }
+
+ Map<String, Object> changeImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(CHANGE_IMAGE, changeImage);
+ }
+
+ Map<String, Object> postImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+ if (!isIndexCellDeleteRow) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+ : preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ }
Review Comment:
Instead of using a separate loop for for building postImage, why not do it
as part of both the pre and change image? I mean, you would repeat the lines
201 and 215 for postImage as well.
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<ImmutableBytesPtr, Cell> preImageObj,
+ Map<ImmutableBytesPtr, Cell> changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ Map<String, Object> preImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell :
preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(PRE_IMAGE, preImage);
+ }
+
+ Map<String, Object> changeImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(CHANGE_IMAGE, changeImage);
+ }
+
+ Map<String, Object> postImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+ if (!isIndexCellDeleteRow) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+ : preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ }
+ rowValueMap.put(POST_IMAGE, postImage);
+ }
+
+ if (isIndexCellDeleteRow) {
+ rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+ } else {
+ rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+ }
+
+ byte[] value =
+ new
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+ CellBuilder builder =
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+ Result cdcRow = Result.create(Arrays.asList(builder.
+ setRow(indexToDataRowKeyMap.get(new
ImmutableBytesPtr(firstCell.getRowArray(),
+ firstCell.getRowOffset(), firstCell.getRowLength())
+ .copyBytesIfNecessary())).
+ setFamily(firstCell.getFamilyArray()).
+ setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)).
+ setTimestamp(indexCellTS).
+ setValue(value).
+ setType(Cell.Type.Put).
+ build()));
+
+ return cdcRow;
+ }
+
+ @Override
+ protected void scanDataTableRows(long startTime) throws IOException {
+ super.scanDataTableRows(startTime);
+ List<List<Cell>> indexRowList = new ArrayList<>();
+ // Creating new Index Rows for Delete Row events
+ for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+ List<Cell> indexRow = indexRows.get(rowIndex);
+ indexRowList.add(indexRow);
+ if (indexRow.size() > 1) {
+ List<Cell> deleteRow = null;
Review Comment:
Nit: Outside the loop you are just using this as a flag, so why not use a
boolean flag here and make use of a local list at 294?
> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
> Key: PHOENIX-7015
> URL: https://issues.apache.org/jira/browse/PHOENIX-7015
> Project: Phoenix
> Issue Type: Sub-task
> Reporter: Viraj Jasani
> Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row
> including pre and/or post row images.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)