[ https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814791#comment-17814791 ]
ASF GitHub Bot commented on PHOENIX-7015: ----------------------------------------- TheNamesRai commented on code in PR #1813: URL: https://github.com/apache/phoenix/pull/1813#discussion_r1479741178 ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp()) { + lowerBoundForPreImage = cell.getTimestamp(); + } + } else if (cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) { + if (!Arrays.equals(cell.getQualifierArray(), emptyCQ) + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { Review Comment: Yes, its not required. We will only need the empty column qualifier check. Will update. thanks > Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase > ----------------------------------------------------------------------- > > Key: PHOENIX-7015 > URL: https://issues.apache.org/jira/browse/PHOENIX-7015 > Project: Phoenix > Issue Type: Sub-task > Reporter: Viraj Jasani > Priority: Major > > For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to > CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs > raw scan to index table and retrieve data table rows from index rows. > Using the time range, it can form a JSON blob to represent changes to the row > including pre and/or post row images. -- This message was sent by Atlassian Jira (v8.20.10#820010)