[
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814790#comment-17814790
]
ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------
TheNamesRai commented on code in PR #1813:
URL: https://github.com/apache/phoenix/pull/1813#discussion_r1479673714
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner
innerScanner,
super(innerScanner, region, scan, env, dataTableScan, tupleProjector,
indexMaintainer,
viewConstants, ptr, pageSizeMs, queryLimit);
CDCUtil.initForRawScan(dataTableScan);
- dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
- dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+ cdcDataTableInfo =
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+ .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+ Charset utf8Charset = StandardCharsets.UTF_8;
+ String cdcChangeScopeStr = utf8Charset.decode(
+
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+ try {
+ cdcChangeScopeSet =
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws
IOException {
+ //TODO: Get Timerange from the start row and end row of the index scan
object
+ // and set it in the datatable scan object.
+// if (scan.getStartRow().length == 8) {
+// startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStartRow(), 0, SortOrder.getDefault());
+// }
+// if (scan.getStopRow().length == 8) {
+// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStopRow(), 0, SortOrder.getDefault());
+// }
return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
}
protected boolean getNextCoveredIndexRow(List<Cell> result) throws
IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
- for (Cell c: indexRow) {
- if (c.getType() == Cell.Type.Put) {
- result.add(c);
- }
- }
+ Cell firstCell = indexRow.get(indexRow.size() - 1);
+ byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+ firstCell.getRowOffset(), firstCell.getRowLength())
+ .copyBytesIfNecessary();
+ ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+ indexToDataRowKeyMap.get(indexRowKey));
+ Result dataRow = dataRows.get(dataRowKey);
+ Long indexCellTS = firstCell.getTimestamp();
+ Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+ Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+ Long lowerBoundForPreImage = 0L;
+ boolean isIndexCellDeleteRow = false;
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
try {
- Result dataRow = null;
- if (! result.isEmpty()) {
- Cell firstCell = result.get(0);
- byte[] indexRowKey = new
ImmutableBytesPtr(firstCell.getRowArray(),
- firstCell.getRowOffset(), firstCell.getRowLength())
- .copyBytesIfNecessary();
- ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
- indexToDataRowKeyMap.get(indexRowKey));
- dataRow = dataRows.get(dataRowKey);
- Long indexRowTs = result.get(0).getTimestamp();
- Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline =
dataRowChanges.get(
- dataRowKey);
- if (changeTimeline == null) {
- List<Cell> resultCells =
Arrays.asList(dataRow.rawCells());
- Collections.sort(resultCells,
CellComparator.getInstance().reversed());
- List<Cell> deleteMarkers = new ArrayList<>();
- List<List<Cell>> columns = new LinkedList<>();
- Cell currentColumnCell = null;
- Pair<byte[], byte[]> emptyKV =
EncodedColumnsUtil.getEmptyKeyValueInfo(
-
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
- List<Cell> currentColumn = null;
- Set<Long> uniqueTimeStamps = new HashSet<>();
- // TODO: From CompactionScanner.formColumns(), see if
this can be refactored.
- for (Cell cell : resultCells) {
- uniqueTimeStamps.add(cell.getTimestamp());
- if (cell.getType() != Cell.Type.Put) {
- deleteMarkers.add(cell);
- }
- if (CellUtil.matchingColumn(cell,
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
- emptyKV.getFirst())) {
- continue;
+ int columnListIndex = 0;
+ List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+ this.cdcDataTableInfo.getColumnInfoList();
+ for (Cell cell : dataRow.rawCells()) {
+ if (cell.getType() == Cell.Type.DeleteFamily) {
Review Comment:
We will only compare with the first Column Family for Delete Family cells
because there is no way to delete column family in Phoenix, We can only delete
the entire row, which will create same Delete Family markers for all column
family
> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
> Key: PHOENIX-7015
> URL: https://issues.apache.org/jira/browse/PHOENIX-7015
> Project: Phoenix
> Issue Type: Sub-task
> Reporter: Viraj Jasani
> Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row
> including pre and/or post row images.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)