[ 
https://issues.apache.org/jira/browse/PHOENIX-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801509#comment-17801509
 ] 

Hari Krishna Dara commented on PHOENIX-7001:
--------------------------------------------

PR: https://github.com/apache/phoenix/pull/1766

> Change Data Capture leveraging Max Lookback and Uncovered Indexes
> -----------------------------------------------------------------
>
>                 Key: PHOENIX-7001
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7001
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: Kadir Ozdemir
>            Priority: Major
>
> The use cases for a Change Data Capture (CDC) feature are centered around 
> capturing changes to a given table (or updatable view) as these changes 
> happen in near real-time. A CDC application can retrieve changes in real-time 
> or with some delay, or even retrieves the same set of changes multiple times. 
> This means the CDC use case can be generalized as time range queries where 
> the time range is typically short such as last x minutes or hours or 
> expressed as a specific time range in the last n days where n is typically 
> less than 7.
> A change is an update in a row. That is, a change is either updating one or 
> more columns of a table for a given row or deleting a row. It is desirable to 
> provide these changes in the order of their arrival. One can visualize the 
> delivery of these changes through a stream from a Phoenix table to the 
> application that is initiated by the application similar to the delivery of 
> any other Phoenix query results. The difference is that a regular query 
> result includes at most one result row for each row satisfying the query and 
> the deleted rows are not visible to the query result while the CDC 
> stream/result can include multiple result rows for each row and the result 
> includes deleted rows. Some use cases need to also get the pre and/or post 
> image of the row along with a change on the row. 
> The design proposed here leverages Phoenix Max Lookback and Uncovered (Global 
> or Local) Indexes. The max lookback feature retains recent changes to a 
> table, that is, the changes that have been done in the last x days typically. 
> This means that the max lookback feature already captures the changes to a 
> given table. Currently, the max lookback age is configurable at the cluster 
> level. We need to extend this capability to be able to configure the max 
> lookback age at the table level so that each table can have a different max 
> lookback age based on its CDC application requirements.
> To deliver the changes in the order of their arrival, we need a time based 
> index. This index should be uncovered as the changes are already retained in 
> the table by the max lookback feature. The arrival time can be defined as the 
> mutation timestamp generated by the server, or a user-specified timestamp (or 
> any other long integer) column. An uncovered index would allow us to 
> efficiently and orderly access to the changes. Changes to an index table are 
> also preserved by the max lookback feature.
> A CDC feature can be composed of the following components:
>  * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an 
> uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. 
> It goes through index table rows using a raw scan to identify data table rows 
> and retrieves these rows using a raw scan. Using the time range, it forms a 
> JSON blob to represent changes to the row including pre and/or post row 
> images.
>  * {*}CDC Query Compiler{*}: This is a client side component. It prepares the 
> scan object based on the given CDC query statement. 
>  * {*}CDC DDL Compiler{*}: This is a client side component. It creates the 
> time based uncovered (global/local) index based on the given CDC DDL 
> statement and a virtual table of CDC type. CDC will be a new table type. 
> A CDC DDL syntax to create CDC on a (data) table can be as follows: 
> Create CDC <CDC Table Name> on <Data Table Name> (PHOENIX_ROW_TIMESTAMP()  | 
> <Data Table Column>) INCLUDE (pre | post | latest | all) TTL = <Age in 
> seconds> INDEX = <GLOBAL | LOCAL> SALT_BUCKETS=<salt bucket count>
> The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC 
> table PK columns start with the timestamp or user defined column and continue 
> with the data table PK columns. The CDC table includes one non-PK column 
> which is a JSON column. The change is expressed in this JSON column in 
> multiple ways based on the CDC DDL or query statement. The change can be 
> expressed as just the mutation for the change, the latest image of the row, 
> the pre image of the row (the image before the change), the post image, or 
> any combination of these. The CDC table is not a physical table on disk. It 
> is just a virtual table to be used in a CDC query. Phoenix stores just the 
> metadata for this virtual table. 
> A CDC query can be as follow:
> Select * from <CDC Table Name> where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) 
> AND PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)
> This query would return the rows of the CDC table which is constructed on the 
> server side by CDCUncoveredIndexRegionScanner by joining the uncovered index 
> row versions with the corresponding data table row version (using raw scans). 
> The above select query can be hinted at by using a new CDC hint to return 
> just the actual change, pre, pos, or latest image of the row, or a 
> combination of them to overwrite the default JSON column format defined by 
> the CDC DDL statement. 
> The CDC application will run the above query in a loop. When the difference 
> between the current time of the application and the upper limit of the time 
> range of the query becomes less than s milliseconds, say x milliseconds, then 
> the application thread needs to sleep s - x milliseconds. The value for s can 
> be small such as 1000 milliseconds. This is to make sure that time skew among 
> the server wall clocks does not lead to data loss.  
> A global time based index may create hot spotting during writes. This is 
> because the same region of the global index will keep getting updated. Since 
> the global index would be uncovered, the size of the updates will be usually 
> smaller than the data table updates. If we assume that index mutations are n 
> times smaller than data table mutations, then a single index region will be 
> able to sustain writes from n data table regions if the data table does not 
> have any other indexes. When the data table has other indexes, the data table 
> write can slow down by 3 times or so. This allows a single index region to 
> match with 3n data table regions. If the number of active data table regions 
> is more than a single index region can sustain then we need to distribute the 
> load to multiple index regions using salting. 
> A local time based index does not have the hot spotting issue but can result 
> in slower CDC queries for tables with a large number of regions. That is why 
> this proposal suggests using global indexes by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to