[ https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814790#comment-17814790 ]
ASF GitHub Bot commented on PHOENIX-7015: ----------------------------------------- TheNamesRai commented on code in PR #1813: URL: https://github.com/apache/phoenix/pull/1813#discussion_r1479673714 ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { Review Comment: We will only compare with the first Column Family for Delete Family cells because there is no way to delete column family in Phoenix, We can only delete the entire row, which will create same Delete Family markers for all column family > Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase > ----------------------------------------------------------------------- > > Key: PHOENIX-7015 > URL: https://issues.apache.org/jira/browse/PHOENIX-7015 > Project: Phoenix > Issue Type: Sub-task > Reporter: Viraj Jasani > Priority: Major > > For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to > CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs > raw scan to index table and retrieve data table rows from index rows. > Using the time range, it can form a JSON blob to represent changes to the row > including pre and/or post row images. -- This message was sent by Atlassian Jira (v8.20.10#820010)