[ 
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814603#comment-17814603
 ] 

ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------

TheNamesRai commented on code in PR #1813:
URL: https://github.com/apache/phoenix/pull/1813#discussion_r1479224677


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+            Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+            Long lowerBoundForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        if (columnListIndex > 0) {
+                            continue;
+                        }
+                        if (indexCellTS == cell.getTimestamp()) {
+                            isIndexCellDeleteRow = true;
+                        } else if (indexCellTS > cell.getTimestamp()) {
+                            lowerBoundForPreImage = cell.getTimestamp();

Review Comment:
   Do it once.





> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
>                 Key: PHOENIX-7015
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7015
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Viraj Jasani
>            Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to 
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs 
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row 
> including pre and/or post row images.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to