[ 
https://issues.apache.org/jira/browse/PHOENIX-7751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rahul Kumar updated PHOENIX-7751:
---------------------------------
    Description: 
The tool runs on the source cluster and gets the list of region boundaries for 
a table or table section from the source cluster. This list becomes the list of 
splits for the MR job. For the checkpointing purpose, the tool adds chunks and 
mapper regions to the output table as their processing completes.

 

The chunk formation is done on the server side by the Phoenix coprocessor 
UngrouppedAggregateRegionObserver.  A mapper opens a scan for its mapper region 
on both source and target cluster. The mapper region boundaries are serialized 
into a scan attribute on these scans. These scans also include an attribute to 
signal the Phoenix coprocessor that they are for chunk formation. A scan 
returns a chunk at a time. A chunk could be a full chunk or a partial chunk.  A 
partial chunk is returned only when the table region ends before the mapper 
region does. This can happen on the source cluster if the table region 
boundaries change due to region splits and merges while the tool is running. 
Partial chunks are expected to happen more often on the target cluster as 
mapper regions are aligned with the table regions on the source cluster. In 
this case, the mapper opens another scan to continue from where the previous 
scan ended. This scan also includes a scan attribute for the partial chunk so 
that the scan can complete this partial chunk.

 

After receiving the two copies of a chunk, one from the source and the other 
from the target cluster, the tool within its mappers compares them. If the 
chunk copies are different, then the tool optionally repairs the target copy 
(that is if the current run of the tool is configured for repair). The repair 
operation requires scanning the rows of the chunk using a raw scan from both 
clusters. The repair operation is also done inline in the same mapper and will 
be done with best possible effort i.e repair as much as possible for a mapper 
or chunk.


A challenge arises when a source chunk boundary for example, [30, 80) resides 
within a single region on the source cluster but is split across multiple 
regions on the target cluster (e.g., [30, 40), [40, 50), and [50, 80)).
Standard hash comparisons fail in this scenario because common cryptographic 
hash functions (like MD5) are {*}not associative{*}. The target cluster would 
produce a nested hash MD5(MD5(row30..row39) + MD5(row40...row49) + 
MD5(row50...row79))—which will not match the flat hash generated by the source 
MD5(row30..row79).

 

To resolve this, the Mapper will synchronize the hashing logic by adopting the 
target's boundary structure:
 # *Source Chunk Identification:* The Mapper iterates over the source based on 
the defined chunk size or region end-point. For a given chunk, such as [30, 
80), the server-side scanner holds the rows in a buffer.
 # *Target Boundary Discovery:* The Mapper probes the target cluster using the 
source boundary [30, 80). It identifies the sub-boundaries created by target 
region splits (e.g., [30, 40), [40, 50), and [50, 80)).
 # {*}Target Hash Computation{*}: The Mapper issues scans for each target 
sub-region and computes a "nested" associative hash
 # *Source Hash Alignment:* Using the target boundaries discovered in Step 2, 
the Source Coprocessor iterates over its buffered rows to generate a matching 
nested hash. It hashes the specific subsets [30, 40), [40, 50), and [50, 80) 
individually before combining them into a final associative checksum.
 # *Verification:* The client compares these two associative checksums. This 
allows for data validation across mismatched region boundaries without 
transferring the actual row data to the client.

  was:
Start with implementing tool which can firstly just validate data b/w source 
and target cluster for given table.

Generate mapper based on region boundary on source cluster. We further chunk 
the mapper region boundary with a pre-defined chunkSize(bytes/rowCount) and 
then get target chunk based on source chunk start/endKey and compare hashes.

There would be cases where determined source chunk boundary i.e [30,80) is on 
one region but is not located within one region on target and can be split 
across multiple region i.e [30,40), [40,50), [50,80). In such cases, we cannot 
rely on comparing hashes across source and target. Because the hashes would 
look like MD5(MD5(row30..row39) + MD5(row40...row49) + MD5(row50...row79)) on 
target vs MD5(row30 + row31 + ... + row70) on source and checksums are not 
associative. 

To handle such cases,
 - Mapper iterate over source chunk boundary based on size of chunk(or end of 
source region reached), hold the rows on server scanner and get the source 
chunk boundary like [30,80).

 - Mapper then scans target region chunk based on source chunk boundary [30,80) 
and identifies it has to issue multiple scan for each target chunk region, for 
eg. [30,40), [40,50), [50,80). Mapper computes associative hash 
MD5(MD5(row30..row39) + MD5(row40...row49) + MD5(row50...row79)) on target

 - Source looks at target region boundaries which could be multiple like 
defined above i.e [30,40), [40,50), [50,80) and using already held rows on 
source region scanner coproc, it gets the associative just like target based on 
target region boundaries for the chunk. 

 - Now we can compare the assosiciative checksum between b/w source and target 
without even getting the data on client


> Feature to validate table data using PhoenixSyncTable tool b/w source and 
> target cluster
> ----------------------------------------------------------------------------------------
>
>                 Key: PHOENIX-7751
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-7751
>             Project: Phoenix
>          Issue Type: Sub-task
>    Affects Versions: 5.2.0, 5.2.1, 5.3.0
>            Reporter: Rahul Kumar
>            Assignee: Rahul Kumar
>            Priority: Major
>
> The tool runs on the source cluster and gets the list of region boundaries 
> for a table or table section from the source cluster. This list becomes the 
> list of splits for the MR job. For the checkpointing purpose, the tool adds 
> chunks and mapper regions to the output table as their processing completes.
>  
> The chunk formation is done on the server side by the Phoenix coprocessor 
> UngrouppedAggregateRegionObserver.  A mapper opens a scan for its mapper 
> region on both source and target cluster. The mapper region boundaries are 
> serialized into a scan attribute on these scans. These scans also include an 
> attribute to signal the Phoenix coprocessor that they are for chunk 
> formation. A scan returns a chunk at a time. A chunk could be a full chunk or 
> a partial chunk.  A partial chunk is returned only when the table region ends 
> before the mapper region does. This can happen on the source cluster if the 
> table region boundaries change due to region splits and merges while the tool 
> is running. Partial chunks are expected to happen more often on the target 
> cluster as mapper regions are aligned with the table regions on the source 
> cluster. In this case, the mapper opens another scan to continue from where 
> the previous scan ended. This scan also includes a scan attribute for the 
> partial chunk so that the scan can complete this partial chunk.
>  
> After receiving the two copies of a chunk, one from the source and the other 
> from the target cluster, the tool within its mappers compares them. If the 
> chunk copies are different, then the tool optionally repairs the target copy 
> (that is if the current run of the tool is configured for repair). The repair 
> operation requires scanning the rows of the chunk using a raw scan from both 
> clusters. The repair operation is also done inline in the same mapper and 
> will be done with best possible effort i.e repair as much as possible for a 
> mapper or chunk.
> A challenge arises when a source chunk boundary for example, [30, 80) resides 
> within a single region on the source cluster but is split across multiple 
> regions on the target cluster (e.g., [30, 40), [40, 50), and [50, 80)).
> Standard hash comparisons fail in this scenario because common cryptographic 
> hash functions (like MD5) are {*}not associative{*}. The target cluster would 
> produce a nested hash MD5(MD5(row30..row39) + MD5(row40...row49) + 
> MD5(row50...row79))—which will not match the flat hash generated by the 
> source MD5(row30..row79).
>  
> To resolve this, the Mapper will synchronize the hashing logic by adopting 
> the target's boundary structure:
>  # *Source Chunk Identification:* The Mapper iterates over the source based 
> on the defined chunk size or region end-point. For a given chunk, such as 
> [30, 80), the server-side scanner holds the rows in a buffer.
>  # *Target Boundary Discovery:* The Mapper probes the target cluster using 
> the source boundary [30, 80). It identifies the sub-boundaries created by 
> target region splits (e.g., [30, 40), [40, 50), and [50, 80)).
>  # {*}Target Hash Computation{*}: The Mapper issues scans for each target 
> sub-region and computes a "nested" associative hash
>  # *Source Hash Alignment:* Using the target boundaries discovered in Step 2, 
> the Source Coprocessor iterates over its buffered rows to generate a matching 
> nested hash. It hashes the specific subsets [30, 40), [40, 50), and [50, 80) 
> individually before combining them into a final associative checksum.
>  # *Verification:* The client compares these two associative checksums. This 
> allows for data validation across mismatched region boundaries without 
> transferring the actual row data to the client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to