[ 
https://issues.apache.org/jira/browse/HBASE-30237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junegunn Choi resolved HBASE-30237.
-----------------------------------
    Resolution: Duplicate

> HBase bulkload replication causes duplicate HFile loading and compaction 
> storm when source RPC times out
> --------------------------------------------------------------------------------------------------------
>
>                 Key: HBASE-30237
>                 URL: https://issues.apache.org/jira/browse/HBASE-30237
>             Project: HBase
>          Issue Type: Bug
>          Components: Replication
>    Affects Versions: 2.5.15, 2.6.6
>            Reporter: Nikita Awasthi
>            Priority: Major
>
> h2. Problem
> When bulkload replication is enabled (A to B), the sink cluster (B) copies 
> HFiles from the source cluster's (A) HDFS and then performs a bulk load. If 
> this operation takes longer than {{replication.source.shipedits.timeout}} 
> (default 60s), the source retries the same WAL entry. Since 
> {{HFileReplicator}} creates a new random staging directory on every 
> invocation, the sink processes the same bulkload event multiple times -- 
> loading the same HFiles as distinct store files repeatedly.
> This leads to:
> * *Storefile size explosion*: the sink's store file size can grow to 3x or 
> more of the source's size.
> * *Compaction storm*: the duplicated store files trigger massive compaction 
> work.
> * *Replication lag*: the source accumulates WAL entries while the sink is 
> overwhelmed.
> The root cause is that HBase bulkload replication has at-least-once 
> semantics, but {{ReplicationSink}} has no deduplication mechanism to prevent 
> concurrent reprocessing of the same bulkload event.
> h2. Evidence (observed in production)
> In a production bulkload replication setup, the following sequence was 
> observed:
> * A single bulkload event on the source triggered repeated BulkloadRPC calls 
> on the sink -- more invocations than the source fired -- confirming multiple 
> retries of the same WAL entry.
> * Sink RS CPU spiked from baseline ~2% to over 40%.
> * Sink network inbound peaked at several hundred MB/s per node.
> * Sink cluster storefile size grew to over 3x the source within ~90 minutes.
> * Compaction queue on the sink peaked at tens of thousands, requiring hours 
> of major compaction to recover.
> * Source replication lag peaked at ~70 minutes, with WAL queue growing to 
> tens of files per RS.
> h2. Root Cause Analysis
> The execution path is:
> {code}
> ReplicationSourceShipper.shipEdits()
>   -> HBaseInterClusterReplicationEndpoint.replicate()   [RPC with 
> shipEditsTimeout]
>     -> ReplicationSink.replicateEntries()
>       -> HFileReplicator.replicate()
>           copyHFilesToStagingDir()      <- copies HFiles from source HDFS 
> (slow)
>           LoadIncrementalHFiles.load()  <- bulk loads into sink table
> {code}
> {{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying 
> large HFiles takes longer than {{shipEditsTimeout}}, the source retries with 
> the same WAL entry. {{HFileReplicator}} creates a new random staging 
> directory each time, so there is no detection of an in-progress or 
> already-completed invocation.
> Additionally, there is no rate-limiting mechanism for the HFile copy 
> bandwidth, meaning a single large bulkload can saturate the sink cluster's 
> network and cause cascading degradation.
> h2. Fix
> This patch addresses both issues:
> *1. Deduplication of concurrent bulkload replication*
> {{ReplicationSink}} tracks in-progress bulkload events in a 
> {{ConcurrentHashSet}}. The key is:
> {code}
> replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
> {code}
> If a retry arrives while the same event is already being processed, it is 
> skipped with a WARN log. The key is removed in a {{finally}} block so that 
> subsequent retries (after the previous attempt completes or fails) are 
> processed normally.
> Note: This prevents duplicate processing under concurrent retry scenarios. It 
> does not change the at-least-once delivery guarantee of WAL replication.
> *2. Configurable bandwidth throttling for HFile copy*
> Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s, 
> default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy 
> threads within a single {{HFileReplicator}} invocation, so the configured 
> bandwidth is a per-RS ceiling regardless of 
> {{hbase.replication.bulkload.copy.maxthreads}}.
> {{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit 
> can be updated dynamically via configuration reload without restarting 
> RegionServers.
> h2. Steps to Reproduce
> # Set up bulkload replication: cluster A replicates to cluster B.
> # Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or 
> ensure HFile copy takes longer than {{replication.source.shipedits.timeout}}).
> # Perform a large bulkload on cluster A.
> # Observe that cluster B processes the same bulkload WAL entry multiple 
> times, resulting in duplicated store files.
> h2. Expected Behavior
> Each unique bulkload WAL entry (identified by {{replicationClusterId}}, 
> {{encodedRegionName}}, {{bulkloadSeqNum}}) should be processed at most once 
> concurrently at the sink, even when the source retries due to RPC timeout.
> h2. Actual Behavior
> The same bulkload WAL entry is processed multiple times, creating duplicate 
> store files and triggering compaction storm.
> h2. Additional Notes
> * Increasing {{replication.source.shipedits.timeout}} reduces the probability 
> of retry but does not eliminate it and introduces WAL retention overhead.
> * The deduplication fix and the bandwidth throttling fix are complementary: 
> throttling prevents network saturation that causes the timeout in the first 
> place, while deduplication prevents the cascading failure when timeout does 
> occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to