[
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808036#comment-17808036
]
ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------
TheNamesRai commented on code in PR #1794:
URL: https://github.com/apache/phoenix/pull/1794#discussion_r1456972705
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -104,115 +94,74 @@ protected Scan prepareDataTableScan(Collection<byte[]>
dataRowKeys) throws IOExc
protected boolean getNextCoveredIndexRow(List<Cell> result) throws
IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
- for (Cell c: indexRow) {
- if (c.getType() == Cell.Type.Put) {
- result.add(c);
- }
- }
+ Cell firstCell = indexRow.get(indexRow.size() - 1);
+ byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+ firstCell.getRowOffset(), firstCell.getRowLength())
+ .copyBytesIfNecessary();
+ ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+ indexToDataRowKeyMap.get(indexRowKey));
+ Result dataRow = dataRows.get(dataRowKey);
+ Long indexCellTs = firstCell.getTimestamp();
+ Cell.Type indexCellType = firstCell.getType();
+
+ Map<ImmutableBytesPtr, Cell> preImageObj = new HashMap<>();
+ Map<ImmutableBytesPtr, Cell> changeImageObj = new HashMap<>();
+ List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
+ Collections.sort(resultCells,
CellComparator.getInstance().reversed());
+
+ boolean isIndexCellDeleteRow = false;
+ boolean isIndexCellDeleteColumn = false;
try {
- Result dataRow = null;
- if (! result.isEmpty()) {
- Cell firstCell = result.get(0);
- byte[] indexRowKey = new
ImmutableBytesPtr(firstCell.getRowArray(),
- firstCell.getRowOffset(), firstCell.getRowLength())
- .copyBytesIfNecessary();
- ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
- indexToDataRowKeyMap.get(indexRowKey));
- dataRow = dataRows.get(dataRowKey);
- Long indexRowTs = result.get(0).getTimestamp();
- Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline =
dataRowChanges.get(
- dataRowKey);
- if (changeTimeline == null) {
- List<Cell> resultCells =
Arrays.asList(dataRow.rawCells());
- Collections.sort(resultCells,
CellComparator.getInstance().reversed());
- List<Cell> deleteMarkers = new ArrayList<>();
- List<List<Cell>> columns = new LinkedList<>();
- Cell currentColumnCell = null;
- Pair<byte[], byte[]> emptyKV =
EncodedColumnsUtil.getEmptyKeyValueInfo(
-
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
- List<Cell> currentColumn = null;
- Set<Long> uniqueTimeStamps = new HashSet<>();
- // TODO: From CompactionScanner.formColumns(), see if
this can be refactored.
- for (Cell cell : resultCells) {
- uniqueTimeStamps.add(cell.getTimestamp());
- if (cell.getType() != Cell.Type.Put) {
- deleteMarkers.add(cell);
- }
- if (CellUtil.matchingColumn(cell,
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
- emptyKV.getFirst())) {
- continue;
- }
- if (currentColumnCell == null) {
- currentColumn = new LinkedList<>();
- currentColumnCell = cell;
- currentColumn.add(cell);
- } else if (!CellUtil.matchingColumn(cell,
currentColumnCell)) {
- columns.add(currentColumn);
- currentColumn = new LinkedList<>();
- currentColumnCell = cell;
- currentColumn.add(cell);
- } else {
- currentColumn.add(cell);
- }
+ for (Cell cell : resultCells) {
+ if (cell.getType() == Cell.Type.DeleteColumn) {
+ // DDL is not supported in CDC
+ if (cell.getTimestamp() == indexCellTs) {
+ isIndexCellDeleteColumn = true;
+ break;
+ }
+ } else if (cell.getType() == Cell.Type.Put) {
Review Comment:
result cells are sorted in this order colQual -> timestamp.
For example: all delete markers are at the end of the result cells together.
So we have to iterate over all result cells.
> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
> Key: PHOENIX-7015
> URL: https://issues.apache.org/jira/browse/PHOENIX-7015
> Project: Phoenix
> Issue Type: Sub-task
> Reporter: Viraj Jasani
> Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row
> including pre and/or post row images.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)