haridsv commented on code in PR #1794:
URL: https://github.com/apache/phoenix/pull/1794#discussion_r1457249404
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<ImmutableBytesPtr, Cell> preImageObj,
+ Map<ImmutableBytesPtr, Cell> changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ Map<String, Object> preImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell :
preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
Review Comment:
```suggestion
if
(dataColQualNameMap.containsKey(preImageObjCell.getKey())) {
```
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<ImmutableBytesPtr, Cell> preImageObj,
+ Map<ImmutableBytesPtr, Cell> changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ Map<String, Object> preImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell :
preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(PRE_IMAGE, preImage);
+ }
+
+ Map<String, Object> changeImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(CHANGE_IMAGE, changeImage);
+ }
+
+ Map<String, Object> postImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+ if (!isIndexCellDeleteRow) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+ : preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ }
+ rowValueMap.put(POST_IMAGE, postImage);
+ }
+
+ if (isIndexCellDeleteRow) {
+ rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+ } else {
+ rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+ }
+
+ byte[] value =
+ new
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+ CellBuilder builder =
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+ Result cdcRow = Result.create(Arrays.asList(builder.
+ setRow(indexToDataRowKeyMap.get(new
ImmutableBytesPtr(firstCell.getRowArray(),
+ firstCell.getRowOffset(), firstCell.getRowLength())
+ .copyBytesIfNecessary())).
+ setFamily(firstCell.getFamilyArray()).
+ setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)).
+ setTimestamp(indexCellTS).
+ setValue(value).
+ setType(Cell.Type.Put).
+ build()));
+
+ return cdcRow;
+ }
+
+ @Override
+ protected void scanDataTableRows(long startTime) throws IOException {
+ super.scanDataTableRows(startTime);
+ List<List<Cell>> indexRowList = new ArrayList<>();
+ // Creating new Index Rows for Delete Row events
+ for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+ List<Cell> indexRow = indexRows.get(rowIndex);
+ indexRowList.add(indexRow);
+ if (indexRow.size() > 1) {
+ List<Cell> deleteRow = null;
+ for (int cellIndex = indexRow.size() - 1; cellIndex >= 0;
cellIndex--) {
+ Cell cell = indexRow.get(cellIndex);
+ if (cell.getType() == Cell.Type.DeleteFamily) {
+ byte[] indexRowKey = new
ImmutableBytesPtr(cell.getRowArray(),
+ cell.getRowOffset(), cell.getRowLength())
+ .copyBytesIfNecessary();
+ ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+ indexToDataRowKeyMap.get(indexRowKey));
+ Result dataRow = dataRows.get(dataRowKey);
+ for (Cell dataRowCell : dataRow.rawCells()) {
+ // Note: Upsert adds delete family marker in the
index table but not in the datatable.
+ // Delete operation adds delete family marker in
datatable as well as index table.
+ if (dataRowCell.getType() == Cell.Type.DeleteFamily
+ && dataRowCell.getTimestamp() ==
cell.getTimestamp()) {
+ if (deleteRow == null) {
+ deleteRow = new ArrayList<>();
+ }
+ deleteRow.add(cell);
+ indexRowList.add(deleteRow);
Review Comment:
If you change the deleteRow to be a boolean flag, this can just be:
```suggestion
indexRowList.add(Arrays.asList(cell));
```
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<ImmutableBytesPtr, Cell> preImageObj,
+ Map<ImmutableBytesPtr, Cell> changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ Map<String, Object> preImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell :
preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(PRE_IMAGE, preImage);
+ }
+
+ Map<String, Object> changeImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
Review Comment:
```suggestion
if
(dataColQualNameMap.containsKey(changeImageObjCell.getKey())) {
```
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<ImmutableBytesPtr, Cell> preImageObj,
+ Map<ImmutableBytesPtr, Cell> changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ Map<String, Object> preImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell :
preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(PRE_IMAGE, preImage);
+ }
+
+ Map<String, Object> changeImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(CHANGE_IMAGE, changeImage);
+ }
+
+ Map<String, Object> postImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+ if (!isIndexCellDeleteRow) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+ : preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ }
Review Comment:
Instead of using a separate loop for for building postImage, why not do it
as part of both the pre and change image? I mean, you would repeat the lines
201 and 215 for postImage as well.
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +186,126 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<ImmutableBytesPtr, Cell> preImageObj,
+ Map<ImmutableBytesPtr, Cell> changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ Map<String, Object> preImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell :
preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) != null) {
+
preImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(PRE_IMAGE, preImage);
+ }
+
+ Map<String, Object> changeImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE))) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
changeImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ rowValueMap.put(CHANGE_IMAGE, changeImage);
+ }
+
+ Map<String, Object> postImage = new HashMap<>();
+ if (this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST))) {
+ if (!isIndexCellDeleteRow) {
+ for (Map.Entry<ImmutableBytesPtr, Cell> preImageObjCell
+ : preImageObj.entrySet()) {
+ if (dataColQualNameMap.get(preImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(preImageObjCell.getKey()),
+
dataColQualTypeMap.get(preImageObjCell.getKey()).toObject(
+
preImageObjCell.getValue().getValueArray()));
+ }
+ }
+ for (Map.Entry<ImmutableBytesPtr, Cell> changeImageObjCell
+ : changeImageObj.entrySet()) {
+ if (dataColQualNameMap.get(changeImageObjCell.getKey()) !=
null) {
+
postImage.put(dataColQualNameMap.get(changeImageObjCell.getKey()),
+
dataColQualTypeMap.get(changeImageObjCell.getKey()).toObject(
+
changeImageObjCell.getValue().getValueArray()));
+ }
+ }
+ }
+ rowValueMap.put(POST_IMAGE, postImage);
+ }
+
+ if (isIndexCellDeleteRow) {
+ rowValueMap.put(EVENT_TYPE, DELETE_EVENT_TYPE);
+ } else {
+ rowValueMap.put(EVENT_TYPE, UPSERT_EVENT_TYPE);
+ }
+
+ byte[] value =
+ new
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+ CellBuilder builder =
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+ Result cdcRow = Result.create(Arrays.asList(builder.
+ setRow(indexToDataRowKeyMap.get(new
ImmutableBytesPtr(firstCell.getRowArray(),
+ firstCell.getRowOffset(), firstCell.getRowLength())
+ .copyBytesIfNecessary())).
+ setFamily(firstCell.getFamilyArray()).
+ setQualifier(scan.getAttribute(CDC_JSON_COL_QUALIFIER)).
+ setTimestamp(indexCellTS).
+ setValue(value).
+ setType(Cell.Type.Put).
+ build()));
+
+ return cdcRow;
+ }
+
+ @Override
+ protected void scanDataTableRows(long startTime) throws IOException {
+ super.scanDataTableRows(startTime);
+ List<List<Cell>> indexRowList = new ArrayList<>();
+ // Creating new Index Rows for Delete Row events
+ for (int rowIndex = 0; rowIndex < indexRows.size(); rowIndex++) {
+ List<Cell> indexRow = indexRows.get(rowIndex);
+ indexRowList.add(indexRow);
+ if (indexRow.size() > 1) {
+ List<Cell> deleteRow = null;
Review Comment:
Nit: Outside the loop you are just using this as a flag, so why not use a
boolean flag here and make use of a local list at 294?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]