[ https://issues.apache.org/jira/browse/PHOENIX-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hari Krishna Dara reopened PHOENIX-7001: ---------------------------------------- Resolved wrong item. > Change Data Capture leveraging Max Lookback and Uncovered Indexes > ----------------------------------------------------------------- > > Key: PHOENIX-7001 > URL: https://issues.apache.org/jira/browse/PHOENIX-7001 > Project: Phoenix > Issue Type: Improvement > Reporter: Kadir Ozdemir > Priority: Major > > The use cases for a Change Data Capture (CDC) feature are centered around > capturing changes to a given table (or updatable view) as these changes > happen in near real-time. A CDC application can retrieve changes in real-time > or with some delay, or even retrieves the same set of changes multiple times. > This means the CDC use case can be generalized as time range queries where > the time range is typically short such as last x minutes or hours or > expressed as a specific time range in the last n days where n is typically > less than 7. > A change is an update in a row. That is, a change is either updating one or > more columns of a table for a given row or deleting a row. It is desirable to > provide these changes in the order of their arrival. One can visualize the > delivery of these changes through a stream from a Phoenix table to the > application that is initiated by the application similar to the delivery of > any other Phoenix query results. The difference is that a regular query > result includes at most one result row for each row satisfying the query and > the deleted rows are not visible to the query result while the CDC > stream/result can include multiple result rows for each row and the result > includes deleted rows. Some use cases need to also get the pre and/or post > image of the row along with a change on the row. > The design proposed here leverages Phoenix Max Lookback and Uncovered (Global > or Local) Indexes. The max lookback feature retains recent changes to a > table, that is, the changes that have been done in the last x days typically. > This means that the max lookback feature already captures the changes to a > given table. Currently, the max lookback age is configurable at the cluster > level. We need to extend this capability to be able to configure the max > lookback age at the table level so that each table can have a different max > lookback age based on its CDC application requirements. > To deliver the changes in the order of their arrival, we need a time based > index. This index should be uncovered as the changes are already retained in > the table by the max lookback feature. The arrival time can be defined as the > mutation timestamp generated by the server, or a user-specified timestamp (or > any other long integer) column. An uncovered index would allow us to > efficiently and orderly access to the changes. Changes to an index table are > also preserved by the max lookback feature. > A CDC feature can be composed of the following components: > * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an > uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner. > It goes through index table rows using a raw scan to identify data table rows > and retrieves these rows using a raw scan. Using the time range, it forms a > JSON blob to represent changes to the row including pre and/or post row > images. > * {*}CDC Query Compiler{*}: This is a client side component. It prepares the > scan object based on the given CDC query statement. > * {*}CDC DDL Compiler{*}: This is a client side component. It creates the > time based uncovered (global/local) index based on the given CDC DDL > statement and a virtual table of CDC type. CDC will be a new table type. > A CDC DDL syntax to create CDC on a (data) table can be as follows: > Create CDC <CDC Table Name> on <Data Table Name> (PHOENIX_ROW_TIMESTAMP() | > <Data Table Column>) INCLUDE (pre | post | latest | all) TTL = <Age in > seconds> INDEX = <GLOBAL | LOCAL> SALT_BUCKETS=<salt bucket count> > The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC > table PK columns start with the timestamp or user defined column and continue > with the data table PK columns. The CDC table includes one non-PK column > which is a JSON column. The change is expressed in this JSON column in > multiple ways based on the CDC DDL or query statement. The change can be > expressed as just the mutation for the change, the latest image of the row, > the pre image of the row (the image before the change), the post image, or > any combination of these. The CDC table is not a physical table on disk. It > is just a virtual table to be used in a CDC query. Phoenix stores just the > metadata for this virtual table. > A CDC query can be as follow: > Select * from <CDC Table Name> where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …) > AND PHOENIX_ROW_TIMESTAMP() < TO_DATE( …) > This query would return the rows of the CDC table which is constructed on the > server side by CDCUncoveredIndexRegionScanner by joining the uncovered index > row versions with the corresponding data table row version (using raw scans). > The above select query can be hinted at by using a new CDC hint to return > just the actual change, pre, pos, or latest image of the row, or a > combination of them to overwrite the default JSON column format defined by > the CDC DDL statement. > The CDC application will run the above query in a loop. When the difference > between the current time of the application and the upper limit of the time > range of the query becomes less than s milliseconds, say x milliseconds, then > the application thread needs to sleep s - x milliseconds. The value for s can > be small such as 1000 milliseconds. This is to make sure that time skew among > the server wall clocks does not lead to data loss. > A global time based index may create hot spotting during writes. This is > because the same region of the global index will keep getting updated. Since > the global index would be uncovered, the size of the updates will be usually > smaller than the data table updates. If we assume that index mutations are n > times smaller than data table mutations, then a single index region will be > able to sustain writes from n data table regions if the data table does not > have any other indexes. When the data table has other indexes, the data table > write can slow down by 3 times or so. This allows a single index region to > match with 3n data table regions. If the number of active data table regions > is more than a single index region can sustain then we need to distribute the > load to multiple index regions using salting. > A local time based index does not have the hot spotting issue but can result > in slower CDC queries for tables with a large number of regions. That is why > this proposal suggests using global indexes by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)