[
https://issues.apache.org/jira/browse/PHOENIX-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815772#comment-17815772
]
ASF GitHub Bot commented on PHOENIX-7015:
-----------------------------------------
kadirozde commented on code in PR #1813:
URL: https://github.com/apache/phoenix/pull/1813#discussion_r1483321830
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +90,157 @@ public CDCGlobalIndexRegionScanner(final RegionScanner
innerScanner,
super(innerScanner, region, scan, env, dataTableScan, tupleProjector,
indexMaintainer,
viewConstants, ptr, pageSizeMs, queryLimit);
CDCUtil.initForRawScan(dataTableScan);
- dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
- dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+ cdcDataTableInfo =
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+ .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+ try {
+ cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(
+ cdcDataTableInfo.getCdcIncludeScopes());
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws
IOException {
+ //TODO: Get Timerange from the start row and end row of the index scan
object
+ // and set it in the datatable scan object.
+// if (scan.getStartRow().length == 8) {
+// startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStartRow(), 0, SortOrder.getDefault());
+// }
+// if (scan.getStopRow().length == 8) {
+// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStopRow(), 0, SortOrder.getDefault());
+// }
return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
}
protected boolean getNextCoveredIndexRow(List<Cell> result) throws
IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
- for (Cell c: indexRow) {
- if (c.getType() == Cell.Type.Put) {
- result.add(c);
- }
+ // firstCell: Picking the earliest cell in the index row so that
Review Comment:
Please add this comment : A given CDC index row can have either one put
cell, or a delete marker (Delete Family) and a put cell due to DML operations.
When a data table column is dropped by a DDL operation and a second put cell
with a higher timestamp is inserted to an index row. The timestamp of this put
cell matches with the timestamp of the corresponding DeleteColumn mutation on
the data table row. This DDL mutation will not show up in the CDC stream.
Index row delete marker timestamp is always higher than or equal to the put
cell timestamp. The delete markers are for maintaining the index rows and can
be ignored. The oldest put cell will be used to point back to the data table
row version. The timestamp of the put cell is used the identify the
corresponding data table mutation which can a put or delete mutation. This is
the reason we only consider the oldest cell (i.e. first cell) here.
##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java:
##########
@@ -446,4 +846,91 @@ public void testSelectUncoveredIndex() throws Exception {
assertEquals(200, rs.getInt(2));
assertEquals(false, rs.next());
}
+
+ private void assertCDCBinaryAndDateColumn(ResultSet rs,
+ List<byte []> byteColumnValues,
+ List<Date> dateColumnValues,
+ Timestamp timestamp) throws
Exception {
+ assertEquals(true, rs.next());
+ assertEquals(1, rs.getInt(2));
+
+ Gson gson = new Gson();
+ Map<String, Object> row1 = new HashMap<String, Object>(){{
+ put(CDC_EVENT_TYPE, CDC_UPSERT_EVENT_TYPE);
+ }};
+ Map<String, Object> postImage = new HashMap<>();
+ postImage.put("A_BINARY",
+ Base64.getEncoder().encodeToString(byteColumnValues.get(0)));
+ postImage.put("D", dateColumnValues.get(0).toString());
+ postImage.put("T", timestamp.toString());
+ row1.put(CDC_POST_IMAGE, postImage);
+ Map<String, Object> changeImage = new HashMap<>();
+ changeImage.put("A_BINARY",
+ Base64.getEncoder().encodeToString(byteColumnValues.get(0)));
+ changeImage.put("D", dateColumnValues.get(0).toString());
+ changeImage.put("T", timestamp.toString());
+ row1.put(CDC_CHANGE_IMAGE, changeImage);
+ row1.put(CDC_PRE_IMAGE, new HashMap<String, String>() {{
+ }});
+ assertEquals(row1, gson.fromJson(rs.getString(3),
+ HashMap.class));
+
+ assertEquals(true, rs.next());
+ assertEquals(2, rs.getInt(2));
+ HashMap<String, Object> row2Json = gson.fromJson(rs.getString(3),
HashMap.class);
+ String row2BinaryColStr = (String)
((Map)((Map)row2Json.get(CDC_CHANGE_IMAGE))).get("A_BINARY");
+ byte[] row2BinaryCol = Base64.getDecoder().decode(row2BinaryColStr);
+
+ assertEquals(0,
DescVarLengthFastByteComparisons.compareTo(byteColumnValues.get(1),
+ 0, byteColumnValues.get(1).length, row2BinaryCol, 0,
row2BinaryCol.length));
+ }
+
+ @Test
+ public void testCDCBinaryAndDateColumn() throws Exception {
+ Properties props = new Properties();
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
+ props.put("hbase.client.scanner.timeout.period", "6000000");
+ props.put("phoenix.query.timeoutMs", "6000000");
+ props.put("zookeeper.session.timeout", "6000000");
+ props.put("hbase.rpc.timeout", "6000000");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ List<byte []> byteColumnValues = new ArrayList<>();
+ byteColumnValues.add( new byte[] {0,0,0,0,0,0,0,0,0,1});
+ byteColumnValues.add(new byte[] {0,0,0,0,0,0,0,0,0,2});
+ List<Date> dateColumnValues = new ArrayList<>();
+ dateColumnValues.add(Date.valueOf("2024-02-01"));
+ dateColumnValues.add(Date.valueOf("2024-01-31"));
+ Timestamp timestampColumnValue = Timestamp.valueOf("2024-01-31
12:12:14");
+ try {
+
+ conn.createStatement().execute("CREATE TABLE " + tableName +
+ " ( k INTEGER PRIMARY KEY," + " a_binary binary(10), d
Date, t TIMESTAMP)");
+
+ String upsertQuery = "UPSERT INTO " + tableName + " (k, a_binary,
d, t) VALUES (?, ?, ?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsertQuery);
+ stmt.setInt(1, 1);
+ stmt.setBytes(2, byteColumnValues.get(0));
+ stmt.setDate(3, dateColumnValues.get(0));
+ stmt.setTimestamp(4, timestampColumnValue);
+ stmt.execute();
+ stmt.setInt(1, 2);
+ stmt.setBytes(2, byteColumnValues.get(1));
+ stmt.setDate(3, dateColumnValues.get(1));
+ stmt.setTimestamp(4, timestampColumnValue);
+ stmt.execute();
+ conn.commit();
+
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName
+ + " ON " + tableName;
+ createAndWait(conn, tableName, cdcName, cdc_sql);
+ assertCDCState(conn, cdcName, null, 3);
+ assertCDCBinaryAndDateColumn(conn.createStatement().executeQuery
+ ("SELECT /*+ CDC_INCLUDE(PRE, POST, CHANGE) */ * " + "FROM
" + cdcName),
+ byteColumnValues, dateColumnValues, timestampColumnValue);
+ } finally {
+ conn.close();
+ }
+ }
Review Comment:
We need to have tests with successful index writes but failed data table
writes. Please see GlobalIndexCheckerIT for how to fail data table writes. We
should not see any CDC event for these failed transactions.
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -223,4 +252,67 @@ protected boolean getNextCoveredIndexRow(List<Cell>
result) throws IOException {
}
return false;
}
+
+ private Result getCDCImage(
+ Map<String, Object> preImageObj, Map<String, Object>
changeImageObj,
+ boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell,
+ boolean isChangeImageInScope, boolean isPreImageInScope, boolean
isPostImageInScope) {
+ Map<String, Object> rowValueMap = new HashMap<>();
+
+ if (isPreImageInScope) {
+ rowValueMap.put(CDC_PRE_IMAGE, preImageObj);
+ }
+
+ if (isChangeImageInScope) {
+ rowValueMap.put(CDC_CHANGE_IMAGE, changeImageObj);
+ }
+
+ Map<String, Object> postImageObj = new HashMap<>();
+ if (isPostImageInScope) {
+ if (!isIndexCellDeleteRow) {
+ for (Map.Entry<String, Object> preImageObjCol :
preImageObj.entrySet()) {
+ postImageObj.put(preImageObjCol.getKey(),
preImageObjCol.getValue());
+ }
+ for (Map.Entry<String, Object> changeImageObjCol :
changeImageObj.entrySet()) {
+ postImageObj.put(changeImageObjCol.getKey(),
changeImageObjCol.getValue());
+ }
+ }
+ rowValueMap.put(CDC_POST_IMAGE, postImageObj);
+ }
+
+ if (isIndexCellDeleteRow) {
Review Comment:
We should not decide if the data table mutation is delete or upsert based on
the index row mutation type. It should be determined based on the data table
mutation type.
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner
innerScanner,
super(innerScanner, region, scan, env, dataTableScan, tupleProjector,
indexMaintainer,
viewConstants, ptr, pageSizeMs, queryLimit);
CDCUtil.initForRawScan(dataTableScan);
- dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
- dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+ cdcDataTableInfo =
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+ .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+ Charset utf8Charset = StandardCharsets.UTF_8;
+ String cdcChangeScopeStr = utf8Charset.decode(
+
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+ try {
+ cdcChangeScopeSet =
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws
IOException {
+ //TODO: Get Timerange from the start row and end row of the index scan
object
Review Comment:
Please state in your TODO statement that the commented out code does not
handle salted and multi-tenant tables.
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +90,157 @@ public CDCGlobalIndexRegionScanner(final RegionScanner
innerScanner,
super(innerScanner, region, scan, env, dataTableScan, tupleProjector,
indexMaintainer,
viewConstants, ptr, pageSizeMs, queryLimit);
CDCUtil.initForRawScan(dataTableScan);
- dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
- dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+ cdcDataTableInfo =
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+ .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+ try {
+ cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(
+ cdcDataTableInfo.getCdcIncludeScopes());
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws
IOException {
+ //TODO: Get Timerange from the start row and end row of the index scan
object
+ // and set it in the datatable scan object.
+// if (scan.getStartRow().length == 8) {
+// startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStartRow(), 0, SortOrder.getDefault());
+// }
+// if (scan.getStopRow().length == 8) {
+// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStopRow(), 0, SortOrder.getDefault());
+// }
return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
}
protected boolean getNextCoveredIndexRow(List<Cell> result) throws
IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
- for (Cell c: indexRow) {
- if (c.getType() == Cell.Type.Put) {
- result.add(c);
- }
+ // firstCell: Picking the earliest cell in the index row so that
+ // timestamp of the cell and the row will be same.
+ Cell firstCell = indexRow.get(indexRow.size() - 1);
+ ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+ indexToDataRowKeyMap.get(CellUtil.cloneRow(firstCell)));
+ Result dataRow = dataRows.get(dataRowKey);
Review Comment:
It should be possible to have a null dataRow. This happens when the index
table write succeeds but the data table write fails. We should handle it here.
In that case we need to skip this index row.
##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java:
##########
@@ -416,6 +706,116 @@ public void testSelectTimeRangeQueries() throws Exception
{
}
}
+ @Test
+ public void testSelectCDCRebuildIndex() throws Exception {
+ Properties props = new Properties();
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
+ props.put("hbase.client.scanner.timeout.period", "6000000");
+ props.put("phoenix.query.timeoutMs", "6000000");
+ props.put("zookeeper.session.timeout", "6000000");
+ props.put("hbase.rpc.timeout", "6000000");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + "
v1 INTEGER, v2 INTEGER, v3 INTEGER, v4 INTEGER)");
+ String cdcName = generateUniqueName();
+
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1,
v2) VALUES (1, 100, 1000)");
Review Comment:
Let's have more row versions here and retrieve these versions with multiple
select statements with different time ranges, that is the where clauses should
include the corresponding PHOENIX_ROW_TIMESTAMP() ranges.
##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +90,157 @@ public CDCGlobalIndexRegionScanner(final RegionScanner
innerScanner,
super(innerScanner, region, scan, env, dataTableScan, tupleProjector,
indexMaintainer,
viewConstants, ptr, pageSizeMs, queryLimit);
CDCUtil.initForRawScan(dataTableScan);
- dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
- dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+ cdcDataTableInfo =
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+ .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+ try {
+ cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(
+ cdcDataTableInfo.getCdcIncludeScopes());
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws
IOException {
+ //TODO: Get Timerange from the start row and end row of the index scan
object
+ // and set it in the datatable scan object.
+// if (scan.getStartRow().length == 8) {
+// startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStartRow(), 0, SortOrder.getDefault());
+// }
+// if (scan.getStopRow().length == 8) {
+// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStopRow(), 0, SortOrder.getDefault());
+// }
return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
}
protected boolean getNextCoveredIndexRow(List<Cell> result) throws
IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
- for (Cell c: indexRow) {
- if (c.getType() == Cell.Type.Put) {
- result.add(c);
- }
+ // firstCell: Picking the earliest cell in the index row so that
+ // timestamp of the cell and the row will be same.
+ Cell firstCell = indexRow.get(indexRow.size() - 1);
+ ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+ indexToDataRowKeyMap.get(CellUtil.cloneRow(firstCell)));
+ Result dataRow = dataRows.get(dataRowKey);
+ Long indexCellTS = firstCell.getTimestamp();
+ Map<String, Object> preImageObj = null;
+ Map<String, Object> changeImageObj = null;
+ boolean isChangeImageInScope = this.cdcChangeScopeSet.size() == 0
+ ||
(this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE));
+ boolean isPreImageInScope =
+ this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE);
+ boolean isPostImageInScope =
+
this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST);
+ if (isPreImageInScope || isPostImageInScope) {
+ preImageObj = new HashMap<>();
+ }
+ if (isChangeImageInScope || isPostImageInScope) {
+ changeImageObj = new HashMap<>();
}
+ Long lowerBoundTsForPreImage = 0L;
+ boolean isIndexCellDeleteRow = false;
+ byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(
+ cdcDataTableInfo.getQualifierEncodingScheme()).getFirst();
try {
- Result dataRow = null;
- if (! result.isEmpty()) {
- Cell firstCell = result.get(0);
- byte[] indexRowKey = new
ImmutableBytesPtr(firstCell.getRowArray(),
- firstCell.getRowOffset(), firstCell.getRowLength())
- .copyBytesIfNecessary();
- ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
- indexToDataRowKeyMap.get(indexRowKey));
- dataRow = dataRows.get(dataRowKey);
- Long indexRowTs = result.get(0).getTimestamp();
- Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline =
dataRowChanges.get(
- dataRowKey);
- if (changeTimeline == null) {
- List<Cell> resultCells =
Arrays.asList(dataRow.rawCells());
- Collections.sort(resultCells,
CellComparator.getInstance().reversed());
- List<Cell> deleteMarkers = new ArrayList<>();
- List<List<Cell>> columns = new LinkedList<>();
- Cell currentColumnCell = null;
- Pair<byte[], byte[]> emptyKV =
EncodedColumnsUtil.getEmptyKeyValueInfo(
-
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
- List<Cell> currentColumn = null;
- Set<Long> uniqueTimeStamps = new HashSet<>();
- // TODO: From CompactionScanner.formColumns(), see if
this can be refactored.
- for (Cell cell : resultCells) {
- uniqueTimeStamps.add(cell.getTimestamp());
- if (cell.getType() != Cell.Type.Put) {
- deleteMarkers.add(cell);
- }
- if (CellUtil.matchingColumn(cell,
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
- emptyKV.getFirst())) {
- continue;
- }
- if (currentColumnCell == null) {
- currentColumn = new LinkedList<>();
- currentColumnCell = cell;
- currentColumn.add(cell);
- } else if (!CellUtil.matchingColumn(cell,
currentColumnCell)) {
- columns.add(currentColumn);
- currentColumn = new LinkedList<>();
- currentColumnCell = cell;
- currentColumn.add(cell);
- } else {
- currentColumn.add(cell);
+ int columnListIndex = 0;
+ List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+ this.cdcDataTableInfo.getColumnInfoList();
+ CDCTableInfo.CDCColumnInfo currentColumnInfo =
+ cdcColumnInfoList.get(columnListIndex);
+ for (Cell cell : dataRow.rawCells()) {
+ if (cell.getType() == Cell.Type.DeleteFamily) {
+ // We will only compare with the first Column Family
for Delete Family
+ // cells because there is no way to delete column
family in Phoenix.
+ if (columnListIndex > 0) {
+ continue;
+ }
+ if (indexCellTS == cell.getTimestamp()) {
+ isIndexCellDeleteRow = true;
+ } else if (indexCellTS > cell.getTimestamp()
+ && lowerBoundTsForPreImage == 0L) {
+ // Cells with timestamp less than the
lowerBoundTsForPreImage
+ // can not be part of the PreImage as there is a
Delete Family
+ // marker after that.
+ lowerBoundTsForPreImage = cell.getTimestamp();
+ }
+ } else if ((cell.getType() == Cell.Type.DeleteColumn
+ || cell.getType() == Cell.Type.Put)
+ && !Arrays.equals(cell.getQualifierArray(),
emptyCQ)
+ && columnListIndex < cdcColumnInfoList.size()) {
+ int cellColumnComparator =
CDCUtil.compareCellFamilyAndQualifier(
+ cell.getFamilyArray(),
cell.getQualifierArray(),
+ currentColumnInfo.getColumnFamily(),
+ currentColumnInfo.getColumnQualifier());
+ while (cellColumnComparator > 0) {
+ columnListIndex += 1;
+ if (columnListIndex >= cdcColumnInfoList.size()) {
+ break;
}
+ currentColumnInfo =
cdcColumnInfoList.get(columnListIndex);
+ cellColumnComparator =
CDCUtil.compareCellFamilyAndQualifier(
+ cell.getFamilyArray(),
cell.getQualifierArray(),
+ currentColumnInfo.getColumnFamily(),
+ currentColumnInfo.getColumnQualifier());
}
- if (currentColumn != null) {
- columns.add(currentColumn);
+ if (columnListIndex >= cdcColumnInfoList.size()) {
+ break;
}
- List<Long> sortedTimestamps =
uniqueTimeStamps.stream().sorted().collect(
- Collectors.toList());
- // FIXME: Does this need to be Concurrent?
- Map<ImmutableBytesPtr, Cell> rollingRow = new
HashMap<>();
- int[] columnPointers = new int[columns.size()];
- changeTimeline = new TreeMap<>();
- dataRowChanges.put(dataRowKey, changeTimeline);
- for (Long ts : sortedTimestamps) {
- for (int i = 0; i < columns.size(); ++i) {
- Cell cell =
columns.get(i).get(columnPointers[i]);
- if (cell.getTimestamp() == ts) {
- rollingRow.put(new ImmutableBytesPtr(
- cell.getQualifierArray(),
- cell.getQualifierOffset(),
- cell.getQualifierLength()),
- cell);
- ++columnPointers[i];
+ if (cellColumnComparator < 0) {
+ continue;
+ }
+ if (cellColumnComparator == 0) {
+ String cdcColumnName =
currentColumnInfo.getColumnFamilyName() +
+ NAME_SEPARATOR +
currentColumnInfo.getColumnName();
+ // Don't include Column Family if it is a default
column Family
+ if (Arrays.equals(
+ currentColumnInfo.getColumnFamily(),
+
cdcDataTableInfo.getDefaultColumnFamily())) {
+ cdcColumnName =
currentColumnInfo.getColumnName();
+ }
+ if (cell.getTimestamp() < indexCellTS
+ && cell.getTimestamp() >
lowerBoundTsForPreImage) {
+ if (isPreImageInScope || isPostImageInScope) {
+ if
(preImageObj.containsKey(cdcColumnName)) {
+ continue;
+ }
+ preImageObj.put(cdcColumnName,
+ this.getColumnValue(cell,
cdcColumnInfoList
+
.get(columnListIndex).getColumnType()));
+ }
+ } else if (cell.getTimestamp() == indexCellTS) {
+ if (isChangeImageInScope ||
isPostImageInScope) {
+ changeImageObj.put(cdcColumnName,
+ this.getColumnValue(cell,
cdcColumnInfoList
+
.get(columnListIndex).getColumnType()));
}
}
- Map<ImmutableBytesPtr, Cell> rowOfCells = new
HashMap();
- rowOfCells.putAll(rollingRow);
- changeTimeline.put(ts, rowOfCells);
}
}
-
- Map<ImmutableBytesPtr, Cell> mapOfCells =
changeTimeline.get(indexRowTs);
- if (mapOfCells != null) {
- Map <String, Object> rowValueMap = new
HashMap<>(mapOfCells.size());
- for (Map.Entry<ImmutableBytesPtr, Cell> entry:
mapOfCells.entrySet()) {
- String colName =
dataColQualNameMap.get(entry.getKey());
- Object colVal =
dataColQualTypeMap.get(entry.getKey()).toObject(
- entry.getValue().getValueArray());
- rowValueMap.put(colName, colVal);
- }
- byte[] value =
- new
Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
- CellBuilder builder =
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
- ImmutableBytesPtr family = new
ImmutableBytesPtr(firstCell.getFamilyArray(),
- firstCell.getFamilyOffset(),
firstCell.getFamilyLength());
- dataRow = Result.create(Arrays.asList(builder.
- setRow(dataRowKey.copyBytesIfNecessary()).
- setFamily(family.copyBytesIfNecessary()).
-
setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
- setTimestamp(firstCell.getTimestamp()).
- setValue(value).
- setType(Cell.Type.Put).
- build()));
- }
}
- if (dataRow != null && tupleProjector != null) {
- IndexUtil.addTupleAsOneCell(result, new
ResultTuple(dataRow),
+ Result cdcRow = getCDCImage(preImageObj, changeImageObj,
isIndexCellDeleteRow,
+ indexCellTS, firstCell, isChangeImageInScope,
isPreImageInScope,
+ isPostImageInScope);
+ if (cdcRow != null && tupleProjector != null) {
+ if (firstCell.getType() == Cell.Type.DeleteFamily) {
Review Comment:
Is it possible to have a Delete Family first cell? Should not be the first
cell a put cell?
> Extend UncoveredGlobalIndexRegionScanner for CDC region scanner usecase
> -----------------------------------------------------------------------
>
> Key: PHOENIX-7015
> URL: https://issues.apache.org/jira/browse/PHOENIX-7015
> Project: Phoenix
> Issue Type: Sub-task
> Reporter: Viraj Jasani
> Priority: Major
>
> For CDC region scanner usecase, extend UncoveredGlobalIndexRegionScanner to
> CDCUncoveredGlobalIndexRegionScanner. The new region scanner for CDC performs
> raw scan to index table and retrieve data table rows from index rows.
> Using the time range, it can form a JSON blob to represent changes to the row
> including pre and/or post row images.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)