[ https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814604#comment-17814604 ]
ASF GitHub Bot commented on PHOENIX-7015: ----------------------------------------- TheNamesRai commented on code in PR #1813: URL: https://github.com/apache/phoenix/pull/1813#discussion_r1479231430 ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp()) { + lowerBoundForPreImage = cell.getTimestamp(); + } + } else if (cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) { + if (!Arrays.equals(cell.getQualifierArray(), emptyCQ) + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + while (columnListIndex < cdcColumnInfoList.size() + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + columnListIndex += 1; } - if (currentColumnCell == null) { - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { - columns.add(currentColumn); - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else { - currentColumn.add(cell); + if (columnListIndex >= cdcColumnInfoList.size()) { + break; } } - if (currentColumn != null) { - columns.add(currentColumn); + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) < 0) { + continue; } - List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect( - Collectors.toList()); - // FIXME: Does this need to be Concurrent? - Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>(); - int[] columnPointers = new int[columns.size()]; - changeTimeline = new TreeMap<>(); - dataRowChanges.put(dataRowKey, changeTimeline); - for (Long ts : sortedTimestamps) { - for (int i = 0; i < columns.size(); ++i) { - Cell cell = columns.get(i).get(columnPointers[i]); - if (cell.getTimestamp() == ts) { - rollingRow.put(new ImmutableBytesPtr( - cell.getQualifierArray(), - cell.getQualifierOffset(), - cell.getQualifierLength()), - cell); - ++columnPointers[i]; + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) == 0) { + String columnFamily = StandardCharsets.UTF_8 + .decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex) + .getColumnFamily())).toString(); + String columnQualifier = cdcColumnInfoList.get(columnListIndex) + .getColumnName(); + if (Arrays.equals( + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcDataTableInfo.getDefaultColumnFamily())) { + columnFamily = DEFAULT_COLUMN_FAMILY_STR; Review Comment: use dot notation for as a key (family.qualifier) > Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase > ----------------------------------------------------------------------- > > Key: PHOENIX-7015 > URL: https://issues.apache.org/jira/browse/PHOENIX-7015 > Project: Phoenix > Issue Type: Sub-task > Reporter: Viraj Jasani > Priority: Major > > For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to > CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs > raw scan to index table and retrieve data table rows from index rows. > Using the time range, it can form a JSON blob to represent changes to the row > including pre and/or post row images. -- This message was sent by Atlassian Jira (v8.20.10#820010)