[
https://issues.apache.org/jira/browse/PHOENIX-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hari Krishna Dara reopened PHOENIX-7001:
----------------------------------------
Resolved wrong item.
> Change Data Capture leveraging Max Lookback and Uncovered Indexes
> -----------------------------------------------------------------
>
> Key: PHOENIX-7001
> URL: https://issues.apache.org/jira/browse/PHOENIX-7001
> Project: Phoenix
> Issue Type: Improvement
> Reporter: Kadir Ozdemir
> Priority: Major
>
> The use cases for a Change Data Capture (CDC) feature are centered around
> capturing changes to a given table (or updatable view) as these changes
> happen in near real-time. A CDC application can retrieve changes in real-time
> or with some delay, or even retrieves the same set of changes multiple times.
> This means the CDC use case can be generalized as time range queries where
> the time range is typically short such as last x minutes or hours or
> expressed as a specific time range in the last n days where n is typically
> less than 7.
> A change is an update in a row. That is, a change is either updating one or
> more columns of a table for a given row or deleting a row. It is desirable to
> provide these changes in the order of their arrival. One can visualize the
> delivery of these changes through a stream from a Phoenix table to the
> application that is initiated by the application similar to the delivery of
> any other Phoenix query results. The difference is that a regular query
> result includes at most one result row for each row satisfying the query and
> the deleted rows are not visible to the query result while the CDC
> stream/result can include multiple result rows for each row and the result
> includes deleted rows. Some use cases need to also get the pre and/or post
> image of the row along with a change on the row.
> The design proposed here leverages Phoenix Max Lookback and Uncovered (Global
> or Local) Indexes. The max lookback feature retains recent changes to a
> table, that is, the changes that have been done in the last x days typically.
> This means that the max lookback feature already captures the changes to a
> given table. Currently, the max lookback age is configurable at the cluster
> level. We need to extend this capability to be able to configure the max
> lookback age at the table level so that each table can have a different max
> lookback age based on its CDC application requirements.
> To deliver the changes in the order of their arrival, we need a time based
> index. This index should be uncovered as the changes are already retained in
> the table by the max lookback feature. The arrival time can be defined as the
> mutation timestamp generated by the server, or a user-specified timestamp (or
> any other long integer) column. An uncovered index would allow us to
> efficiently and orderly access to the changes. Changes to an index table are
> also preserved by the max lookback feature.
> A CDC feature can be composed of the following components:
> * {*}CDCUncoveredIndexRegionScanner{*}: This is a server side scanner on an
> uncovered index used for CDC. This can inherit UncoveredIndexRegionScanner.
> It goes through index table rows using a raw scan to identify data table rows
> and retrieves these rows using a raw scan. Using the time range, it forms a
> JSON blob to represent changes to the row including pre and/or post row
> images.
> * {*}CDC Query Compiler{*}: This is a client side component. It prepares the
> scan object based on the given CDC query statement.
> * {*}CDC DDL Compiler{*}: This is a client side component. It creates the
> time based uncovered (global/local) index based on the given CDC DDL
> statement and a virtual table of CDC type. CDC will be a new table type.
> A CDC DDL syntax to create CDC on a (data) table can be as follows:
> Create CDC <CDC Table Name> on <Data Table Name> (PHOENIX_ROW_TIMESTAMP() |
> <Data Table Column>) INCLUDE (pre | post | latest | all) TTL = <Age in
> seconds> INDEX = <GLOBAL | LOCAL> SALT_BUCKETS=<salt bucket count>
> The above CDC DDL creates a virtual CDC table and an uncovered index. The CDC
> table PK columns start with the timestamp or user defined column and continue
> with the data table PK columns. The CDC table includes one non-PK column
> which is a JSON column. The change is expressed in this JSON column in
> multiple ways based on the CDC DDL or query statement. The change can be
> expressed as just the mutation for the change, the latest image of the row,
> the pre image of the row (the image before the change), the post image, or
> any combination of these. The CDC table is not a physical table on disk. It
> is just a virtual table to be used in a CDC query. Phoenix stores just the
> metadata for this virtual table.
> A CDC query can be as follow:
> Select * from <CDC Table Name> where PHOENIX_ROW_TIMESTAMP() >= TO_DATE( …)
> AND PHOENIX_ROW_TIMESTAMP() < TO_DATE( …)
> This query would return the rows of the CDC table which is constructed on the
> server side by CDCUncoveredIndexRegionScanner by joining the uncovered index
> row versions with the corresponding data table row version (using raw scans).
> The above select query can be hinted at by using a new CDC hint to return
> just the actual change, pre, pos, or latest image of the row, or a
> combination of them to overwrite the default JSON column format defined by
> the CDC DDL statement.
> The CDC application will run the above query in a loop. When the difference
> between the current time of the application and the upper limit of the time
> range of the query becomes less than s milliseconds, say x milliseconds, then
> the application thread needs to sleep s - x milliseconds. The value for s can
> be small such as 1000 milliseconds. This is to make sure that time skew among
> the server wall clocks does not lead to data loss.
> A global time based index may create hot spotting during writes. This is
> because the same region of the global index will keep getting updated. Since
> the global index would be uncovered, the size of the updates will be usually
> smaller than the data table updates. If we assume that index mutations are n
> times smaller than data table mutations, then a single index region will be
> able to sustain writes from n data table regions if the data table does not
> have any other indexes. When the data table has other indexes, the data table
> write can slow down by 3 times or so. This allows a single index region to
> match with 3n data table regions. If the number of active data table regions
> is more than a single index region can sustain then we need to distribute the
> load to multiple index regions using salting.
> A local time based index does not have the hot spotting issue but can result
> in slower CDC queries for tables with a large number of regions. That is why
> this proposal suggests using global indexes by default.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)