[ 
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815772#comment-17815772
 ] 

ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------

kadirozde commented on code in PR #1813:
URL: https://github.com/apache/phoenix/pull/1813#discussion_r1483321830


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +90,157 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        try {
+            cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(
+                    cdcDataTableInfo.getCdcIncludeScopes());
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
+            // firstCell: Picking the earliest cell in the index row so that

Review Comment:
   Please add this comment : A given CDC index row can have either one put 
cell, or a delete marker (Delete Family) and a put cell due to DML operations. 
When a data table column is dropped by a DDL operation and a second put cell 
with a higher timestamp is inserted to an index row. The timestamp of this put 
cell matches with the timestamp of the corresponding DeleteColumn mutation on 
the data table row. This DDL mutation will not show up in the CDC stream.  
Index row delete marker timestamp is always higher than or equal to the put 
cell timestamp. The delete markers are for maintaining the index rows and can 
be ignored. The oldest put cell will be used to point back to the data table 
row version. The timestamp of the put cell is used the identify the 
corresponding data table mutation which can a put or delete mutation. This is 
the reason we only consider the oldest cell (i.e. first cell) here.



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java:
##########
@@ -446,4 +846,91 @@ public void testSelectUncoveredIndex() throws Exception {
         assertEquals(200, rs.getInt(2));
         assertEquals(false, rs.next());
     }
+
+    private void assertCDCBinaryAndDateColumn(ResultSet rs,
+                                              List<byte []> byteColumnValues,
+                                              List<Date> dateColumnValues,
+                                              Timestamp timestamp) throws 
Exception {
+        assertEquals(true, rs.next());
+        assertEquals(1, rs.getInt(2));
+
+        Gson gson = new Gson();
+        Map<String, Object> row1 = new HashMap<String, Object>(){{
+            put(CDC_EVENT_TYPE, CDC_UPSERT_EVENT_TYPE);
+        }};
+        Map<String, Object> postImage = new HashMap<>();
+        postImage.put("A_BINARY",
+                Base64.getEncoder().encodeToString(byteColumnValues.get(0)));
+        postImage.put("D", dateColumnValues.get(0).toString());
+        postImage.put("T", timestamp.toString());
+        row1.put(CDC_POST_IMAGE, postImage);
+        Map<String, Object> changeImage = new HashMap<>();
+        changeImage.put("A_BINARY",
+                Base64.getEncoder().encodeToString(byteColumnValues.get(0)));
+        changeImage.put("D", dateColumnValues.get(0).toString());
+        changeImage.put("T", timestamp.toString());
+        row1.put(CDC_CHANGE_IMAGE, changeImage);
+        row1.put(CDC_PRE_IMAGE, new HashMap<String, String>() {{
+        }});
+        assertEquals(row1, gson.fromJson(rs.getString(3),
+                HashMap.class));
+
+        assertEquals(true, rs.next());
+        assertEquals(2, rs.getInt(2));
+        HashMap<String, Object> row2Json = gson.fromJson(rs.getString(3), 
HashMap.class);
+        String row2BinaryColStr = (String) 
((Map)((Map)row2Json.get(CDC_CHANGE_IMAGE))).get("A_BINARY");
+        byte[] row2BinaryCol = Base64.getDecoder().decode(row2BinaryColStr);
+
+        assertEquals(0, 
DescVarLengthFastByteComparisons.compareTo(byteColumnValues.get(1),
+                0, byteColumnValues.get(1).length, row2BinaryCol, 0, 
row2BinaryCol.length));
+    }
+
+    @Test
+    public void testCDCBinaryAndDateColumn() throws Exception {
+        Properties props = new Properties();
+        props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(Long.MAX_VALUE));
+        props.put("hbase.client.scanner.timeout.period", "6000000");
+        props.put("phoenix.query.timeoutMs", "6000000");
+        props.put("zookeeper.session.timeout", "6000000");
+        props.put("hbase.rpc.timeout", "6000000");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        List<byte []> byteColumnValues = new ArrayList<>();
+        byteColumnValues.add( new byte[] {0,0,0,0,0,0,0,0,0,1});
+        byteColumnValues.add(new byte[] {0,0,0,0,0,0,0,0,0,2});
+        List<Date> dateColumnValues = new ArrayList<>();
+        dateColumnValues.add(Date.valueOf("2024-02-01"));
+        dateColumnValues.add(Date.valueOf("2024-01-31"));
+        Timestamp timestampColumnValue = Timestamp.valueOf("2024-01-31 
12:12:14");
+        try {
+
+            conn.createStatement().execute("CREATE TABLE  " + tableName +
+                    " ( k INTEGER PRIMARY KEY," + " a_binary binary(10), d 
Date, t TIMESTAMP)");
+
+            String upsertQuery = "UPSERT INTO " + tableName + " (k, a_binary, 
d, t) VALUES (?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsertQuery);
+            stmt.setInt(1, 1);
+            stmt.setBytes(2, byteColumnValues.get(0));
+            stmt.setDate(3, dateColumnValues.get(0));
+            stmt.setTimestamp(4, timestampColumnValue);
+            stmt.execute();
+            stmt.setInt(1, 2);
+            stmt.setBytes(2, byteColumnValues.get(1));
+            stmt.setDate(3, dateColumnValues.get(1));
+            stmt.setTimestamp(4, timestampColumnValue);
+            stmt.execute();
+            conn.commit();
+
+            String cdcName = generateUniqueName();
+            String cdc_sql = "CREATE CDC " + cdcName
+                    + " ON " + tableName;
+            createAndWait(conn, tableName, cdcName, cdc_sql);
+            assertCDCState(conn, cdcName, null, 3);
+            assertCDCBinaryAndDateColumn(conn.createStatement().executeQuery
+                    ("SELECT /*+ CDC_INCLUDE(PRE, POST, CHANGE) */ * " + "FROM 
" + cdcName),
+                    byteColumnValues, dateColumnValues, timestampColumnValue);
+        } finally {
+            conn.close();
+        }
+    }

Review Comment:
   We need to have tests with successful index writes but failed data table 
writes. Please see GlobalIndexCheckerIT for how to fail data table writes. We 
should not see any CDC event for these failed transactions.  



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +252,67 @@ protected boolean getNextCoveredIndexRow(List<Cell> 
result) throws IOException {
         }
         return false;
     }
+
+    private Result getCDCImage(
+            Map<String, Object> preImageObj, Map<String, Object> 
changeImageObj,
+            boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell,
+            boolean isChangeImageInScope, boolean isPreImageInScope, boolean 
isPostImageInScope) {
+        Map<String, Object> rowValueMap = new HashMap<>();
+
+        if (isPreImageInScope) {
+            rowValueMap.put(CDC_PRE_IMAGE, preImageObj);
+        }
+
+        if (isChangeImageInScope) {
+            rowValueMap.put(CDC_CHANGE_IMAGE, changeImageObj);
+        }
+
+        Map<String, Object> postImageObj = new HashMap<>();
+        if (isPostImageInScope) {
+            if (!isIndexCellDeleteRow) {
+                for (Map.Entry<String, Object> preImageObjCol : 
preImageObj.entrySet()) {
+                    postImageObj.put(preImageObjCol.getKey(), 
preImageObjCol.getValue());
+                }
+                for (Map.Entry<String, Object> changeImageObjCol : 
changeImageObj.entrySet()) {
+                    postImageObj.put(changeImageObjCol.getKey(), 
changeImageObjCol.getValue());
+                }
+            }
+            rowValueMap.put(CDC_POST_IMAGE, postImageObj);
+        }
+
+        if (isIndexCellDeleteRow) {

Review Comment:
   We should not decide if the data table mutation is delete or upsert based on 
the index row mutation type. It should be determined based on the data table 
mutation type.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object

Review Comment:
   Please state in your TODO statement that the commented out code does not 
handle salted and multi-tenant tables.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +90,157 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        try {
+            cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(
+                    cdcDataTableInfo.getCdcIncludeScopes());
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
+            // firstCell: Picking the earliest cell in the index row so that
+            // timestamp of the cell and the row will be same.
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(CellUtil.cloneRow(firstCell)));
+            Result dataRow = dataRows.get(dataRowKey);

Review Comment:
   It should be possible to have a null dataRow. This happens when the index 
table write succeeds but the data table write fails. We should handle it here. 
In that case we need to skip this index row.



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java:
##########
@@ -416,6 +706,116 @@ public void testSelectTimeRangeQueries() throws Exception 
{
         }
     }
 
+    @Test
+    public void testSelectCDCRebuildIndex() throws Exception {
+        Properties props = new Properties();
+        props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(Long.MAX_VALUE));
+        props.put("hbase.client.scanner.timeout.period", "6000000");
+        props.put("phoenix.query.timeoutMs", "6000000");
+        props.put("zookeeper.session.timeout", "6000000");
+        props.put("hbase.rpc.timeout", "6000000");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " 
v1 INTEGER, v2 INTEGER, v3 INTEGER, v4 INTEGER)");
+        String cdcName = generateUniqueName();
+
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1, 
v2) VALUES (1, 100, 1000)");

Review Comment:
   Let's have more row versions here and retrieve these versions with multiple 
select statements with different time ranges, that is the where clauses should 
include the corresponding PHOENIX_ROW_TIMESTAMP() ranges.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +90,157 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        try {
+            cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(
+                    cdcDataTableInfo.getCdcIncludeScopes());
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
+            // firstCell: Picking the earliest cell in the index row so that
+            // timestamp of the cell and the row will be same.
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(CellUtil.cloneRow(firstCell)));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Object> preImageObj = null;
+            Map<String, Object> changeImageObj = null;
+            boolean isChangeImageInScope = this.cdcChangeScopeSet.size() == 0
+                    || 
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE));
+            boolean isPreImageInScope =
+                    this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE);
+            boolean isPostImageInScope =
+                    
this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST);
+            if (isPreImageInScope || isPostImageInScope) {
+                preImageObj = new HashMap<>();
+            }
+            if (isChangeImageInScope || isPostImageInScope) {
+                changeImageObj = new HashMap<>();
             }
+            Long lowerBoundTsForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(
+                    cdcDataTableInfo.getQualifierEncodingScheme()).getFirst();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
-                            }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                CDCTableInfo.CDCColumnInfo currentColumnInfo =
+                        cdcColumnInfoList.get(columnListIndex);
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        // We will only compare with the first Column Family 
for Delete Family
+                        // cells because there is no way to delete column 
family in Phoenix.
+                        if (columnListIndex > 0) {
+                            continue;
+                        }
+                        if (indexCellTS == cell.getTimestamp()) {
+                            isIndexCellDeleteRow = true;
+                        } else if (indexCellTS > cell.getTimestamp()
+                                && lowerBoundTsForPreImage == 0L) {
+                            // Cells with timestamp less than the 
lowerBoundTsForPreImage
+                            // can not be part of the PreImage as there is a 
Delete Family
+                            // marker after that.
+                            lowerBoundTsForPreImage = cell.getTimestamp();
+                        }
+                    } else if ((cell.getType() == Cell.Type.DeleteColumn
+                            || cell.getType() == Cell.Type.Put)
+                            && !Arrays.equals(cell.getQualifierArray(), 
emptyCQ)
+                            && columnListIndex < cdcColumnInfoList.size()) {
+                        int cellColumnComparator = 
CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                currentColumnInfo.getColumnFamily(),
+                                currentColumnInfo.getColumnQualifier());
+                        while (cellColumnComparator > 0) {
+                            columnListIndex += 1;
+                            if (columnListIndex >= cdcColumnInfoList.size()) {
+                                break;
                             }
+                            currentColumnInfo = 
cdcColumnInfoList.get(columnListIndex);
+                            cellColumnComparator = 
CDCUtil.compareCellFamilyAndQualifier(
+                                    cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                    currentColumnInfo.getColumnFamily(),
+                                    currentColumnInfo.getColumnQualifier());
                         }
-                        if (currentColumn != null) {
-                            columns.add(currentColumn);
+                        if (columnListIndex >= cdcColumnInfoList.size()) {
+                            break;
                         }
-                        List<Long> sortedTimestamps = 
uniqueTimeStamps.stream().sorted().collect(
-                                Collectors.toList());
-                        // FIXME: Does this need to be Concurrent?
-                        Map<ImmutableBytesPtr, Cell> rollingRow = new 
HashMap<>();
-                        int[] columnPointers = new int[columns.size()];
-                        changeTimeline = new TreeMap<>();
-                        dataRowChanges.put(dataRowKey, changeTimeline);
-                        for (Long ts : sortedTimestamps) {
-                            for (int i = 0; i < columns.size(); ++i) {
-                                Cell cell = 
columns.get(i).get(columnPointers[i]);
-                                if (cell.getTimestamp() == ts) {
-                                    rollingRow.put(new ImmutableBytesPtr(
-                                                    cell.getQualifierArray(),
-                                                    cell.getQualifierOffset(),
-                                                    cell.getQualifierLength()),
-                                            cell);
-                                    ++columnPointers[i];
+                        if (cellColumnComparator < 0) {
+                            continue;
+                        }
+                        if (cellColumnComparator == 0) {
+                            String cdcColumnName = 
currentColumnInfo.getColumnFamilyName() +
+                                    NAME_SEPARATOR + 
currentColumnInfo.getColumnName();
+                            // Don't include Column Family if it is a default 
column Family
+                            if (Arrays.equals(
+                                    currentColumnInfo.getColumnFamily(),
+                                    
cdcDataTableInfo.getDefaultColumnFamily())) {
+                                cdcColumnName = 
currentColumnInfo.getColumnName();
+                            }
+                            if (cell.getTimestamp() < indexCellTS
+                                    && cell.getTimestamp() > 
lowerBoundTsForPreImage) {
+                                if (isPreImageInScope || isPostImageInScope) {
+                                    if 
(preImageObj.containsKey(cdcColumnName)) {
+                                        continue;
+                                    }
+                                    preImageObj.put(cdcColumnName,
+                                            this.getColumnValue(cell, 
cdcColumnInfoList
+                                                    
.get(columnListIndex).getColumnType()));
+                                }
+                            } else if (cell.getTimestamp() == indexCellTS) {
+                                if (isChangeImageInScope || 
isPostImageInScope) {
+                                    changeImageObj.put(cdcColumnName,
+                                            this.getColumnValue(cell, 
cdcColumnInfoList
+                                                    
.get(columnListIndex).getColumnType()));
                                 }
                             }
-                            Map<ImmutableBytesPtr, Cell> rowOfCells = new 
HashMap();
-                            rowOfCells.putAll(rollingRow);
-                            changeTimeline.put(ts, rowOfCells);
                         }
                     }
-
-                    Map<ImmutableBytesPtr, Cell> mapOfCells = 
changeTimeline.get(indexRowTs);
-                    if (mapOfCells != null) {
-                        Map <String, Object> rowValueMap = new 
HashMap<>(mapOfCells.size());
-                        for (Map.Entry<ImmutableBytesPtr, Cell> entry: 
mapOfCells.entrySet()) {
-                            String colName = 
dataColQualNameMap.get(entry.getKey());
-                            Object colVal = 
dataColQualTypeMap.get(entry.getKey()).toObject(
-                                    entry.getValue().getValueArray());
-                            rowValueMap.put(colName, colVal);
-                        }
-                        byte[] value =
-                                new 
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
-                        CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
-                        ImmutableBytesPtr family = new 
ImmutableBytesPtr(firstCell.getFamilyArray(),
-                                firstCell.getFamilyOffset(), 
firstCell.getFamilyLength());
-                        dataRow = Result.create(Arrays.asList(builder.
-                                setRow(dataRowKey.copyBytesIfNecessary()).
-                                setFamily(family.copyBytesIfNecessary()).
-                                
setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
-                                setTimestamp(firstCell.getTimestamp()).
-                                setValue(value).
-                                setType(Cell.Type.Put).
-                                build()));
-                    }
                 }
-                if (dataRow != null && tupleProjector != null) {
-                    IndexUtil.addTupleAsOneCell(result, new 
ResultTuple(dataRow),
+                Result cdcRow = getCDCImage(preImageObj, changeImageObj, 
isIndexCellDeleteRow,
+                        indexCellTS, firstCell, isChangeImageInScope, 
isPreImageInScope,
+                        isPostImageInScope);
+                if (cdcRow != null && tupleProjector != null) {
+                    if (firstCell.getType() == Cell.Type.DeleteFamily) {

Review Comment:
   Is it possible to have a Delete Family first cell? Should not be the first 
cell a put cell?





> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
>                 Key: PHOENIX-7015
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7015
>             Project: Phoenix
>          Issue Type: Sub-task
>            Reporter: Viraj Jasani
>            Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to 
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs 
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row 
> including pre and/or post row images.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to