[
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814604#comment-17814604
]
ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------
TheNamesRai commented on code in PR #1813:
URL: https://github.com/apache/phoenix/pull/1813#discussion_r1479231430
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner
innerScanner,
super(innerScanner, region, scan, env, dataTableScan, tupleProjector,
indexMaintainer,
viewConstants, ptr, pageSizeMs, queryLimit);
CDCUtil.initForRawScan(dataTableScan);
- dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
- dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+ cdcDataTableInfo =
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+ .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+ Charset utf8Charset = StandardCharsets.UTF_8;
+ String cdcChangeScopeStr = utf8Charset.decode(
+
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+ try {
+ cdcChangeScopeSet =
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws
IOException {
+ //TODO: Get Timerange from the start row and end row of the index scan
object
+ // and set it in the datatable scan object.
+// if (scan.getStartRow().length == 8) {
+// startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStartRow(), 0, SortOrder.getDefault());
+// }
+// if (scan.getStopRow().length == 8) {
+// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStopRow(), 0, SortOrder.getDefault());
+// }
return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
}
protected boolean getNextCoveredIndexRow(List<Cell> result) throws
IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
- for (Cell c: indexRow) {
- if (c.getType() == Cell.Type.Put) {
- result.add(c);
- }
- }
+ Cell firstCell = indexRow.get(indexRow.size() - 1);
+ byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+ firstCell.getRowOffset(), firstCell.getRowLength())
+ .copyBytesIfNecessary();
+ ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+ indexToDataRowKeyMap.get(indexRowKey));
+ Result dataRow = dataRows.get(dataRowKey);
+ Long indexCellTS = firstCell.getTimestamp();
+ Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+ Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+ Long lowerBoundForPreImage = 0L;
+ boolean isIndexCellDeleteRow = false;
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
try {
- Result dataRow = null;
- if (! result.isEmpty()) {
- Cell firstCell = result.get(0);
- byte[] indexRowKey = new
ImmutableBytesPtr(firstCell.getRowArray(),
- firstCell.getRowOffset(), firstCell.getRowLength())
- .copyBytesIfNecessary();
- ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
- indexToDataRowKeyMap.get(indexRowKey));
- dataRow = dataRows.get(dataRowKey);
- Long indexRowTs = result.get(0).getTimestamp();
- Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline =
dataRowChanges.get(
- dataRowKey);
- if (changeTimeline == null) {
- List<Cell> resultCells =
Arrays.asList(dataRow.rawCells());
- Collections.sort(resultCells,
CellComparator.getInstance().reversed());
- List<Cell> deleteMarkers = new ArrayList<>();
- List<List<Cell>> columns = new LinkedList<>();
- Cell currentColumnCell = null;
- Pair<byte[], byte[]> emptyKV =
EncodedColumnsUtil.getEmptyKeyValueInfo(
-
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
- List<Cell> currentColumn = null;
- Set<Long> uniqueTimeStamps = new HashSet<>();
- // TODO: From CompactionScanner.formColumns(), see if
this can be refactored.
- for (Cell cell : resultCells) {
- uniqueTimeStamps.add(cell.getTimestamp());
- if (cell.getType() != Cell.Type.Put) {
- deleteMarkers.add(cell);
- }
- if (CellUtil.matchingColumn(cell,
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
- emptyKV.getFirst())) {
- continue;
+ int columnListIndex = 0;
+ List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+ this.cdcDataTableInfo.getColumnInfoList();
+ for (Cell cell : dataRow.rawCells()) {
+ if (cell.getType() == Cell.Type.DeleteFamily) {
+ if (columnListIndex > 0) {
+ continue;
+ }
+ if (indexCellTS == cell.getTimestamp()) {
+ isIndexCellDeleteRow = true;
+ } else if (indexCellTS > cell.getTimestamp()) {
+ lowerBoundForPreImage = cell.getTimestamp();
+ }
+ } else if (cell.getType() == Cell.Type.DeleteColumn
+ || cell.getType() == Cell.Type.Put) {
+ if (!Arrays.equals(cell.getQualifierArray(), emptyCQ)
+ && CDCUtil.compareCellFamilyAndQualifier(
+ cell.getFamilyArray(),
cell.getQualifierArray(),
+
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+ cdcColumnInfoList.get(columnListIndex)
+ .getColumnQualifier()) > 0) {
+ while (columnListIndex < cdcColumnInfoList.size()
+ && CDCUtil.compareCellFamilyAndQualifier(
+ cell.getFamilyArray(),
cell.getQualifierArray(),
+
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+ cdcColumnInfoList.get(columnListIndex)
+ .getColumnQualifier()) > 0) {
+ columnListIndex += 1;
}
- if (currentColumnCell == null) {
- currentColumn = new LinkedList<>();
- currentColumnCell = cell;
- currentColumn.add(cell);
- } else if (!CellUtil.matchingColumn(cell,
currentColumnCell)) {
- columns.add(currentColumn);
- currentColumn = new LinkedList<>();
- currentColumnCell = cell;
- currentColumn.add(cell);
- } else {
- currentColumn.add(cell);
+ if (columnListIndex >= cdcColumnInfoList.size()) {
+ break;
}
}
- if (currentColumn != null) {
- columns.add(currentColumn);
+ if (CDCUtil.compareCellFamilyAndQualifier(
+ cell.getFamilyArray(),
cell.getQualifierArray(),
+
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+ cdcColumnInfoList.get(columnListIndex)
+ .getColumnQualifier()) < 0) {
+ continue;
}
- List<Long> sortedTimestamps =
uniqueTimeStamps.stream().sorted().collect(
- Collectors.toList());
- // FIXME: Does this need to be Concurrent?
- Map<ImmutableBytesPtr, Cell> rollingRow = new
HashMap<>();
- int[] columnPointers = new int[columns.size()];
- changeTimeline = new TreeMap<>();
- dataRowChanges.put(dataRowKey, changeTimeline);
- for (Long ts : sortedTimestamps) {
- for (int i = 0; i < columns.size(); ++i) {
- Cell cell =
columns.get(i).get(columnPointers[i]);
- if (cell.getTimestamp() == ts) {
- rollingRow.put(new ImmutableBytesPtr(
- cell.getQualifierArray(),
- cell.getQualifierOffset(),
- cell.getQualifierLength()),
- cell);
- ++columnPointers[i];
+ if (CDCUtil.compareCellFamilyAndQualifier(
+ cell.getFamilyArray(),
cell.getQualifierArray(),
+
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+ cdcColumnInfoList.get(columnListIndex)
+ .getColumnQualifier()) == 0) {
+ String columnFamily = StandardCharsets.UTF_8
+
.decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex)
+ .getColumnFamily())).toString();
+ String columnQualifier =
cdcColumnInfoList.get(columnListIndex)
+ .getColumnName();
+ if (Arrays.equals(
+
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+
cdcDataTableInfo.getDefaultColumnFamily())) {
+ columnFamily = DEFAULT_COLUMN_FAMILY_STR;
Review Comment:
use dot notation for as a key (family.qualifier)
> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
> Key: PHOENIX-7015
> URL: https://issues.apache.org/jira/browse/PHOENIX-7015
> Project: Phoenix
> Issue Type: Sub-task
> Reporter: Viraj Jasani
> Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row
> including pre and/or post row images.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)