[
https://issues.apache.org/jira/browse/HBASE-30237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Junegunn Choi resolved HBASE-30237.
-----------------------------------
Resolution: Duplicate
> HBase bulkload replication causes duplicate HFile loading and compaction
> storm when source RPC times out
> --------------------------------------------------------------------------------------------------------
>
> Key: HBASE-30237
> URL: https://issues.apache.org/jira/browse/HBASE-30237
> Project: HBase
> Issue Type: Bug
> Components: Replication
> Affects Versions: 2.5.15, 2.6.6
> Reporter: Nikita Awasthi
> Priority: Major
>
> h2. Problem
> When bulkload replication is enabled (A to B), the sink cluster (B) copies
> HFiles from the source cluster's (A) HDFS and then performs a bulk load. If
> this operation takes longer than {{replication.source.shipedits.timeout}}
> (default 60s), the source retries the same WAL entry. Since
> {{HFileReplicator}} creates a new random staging directory on every
> invocation, the sink processes the same bulkload event multiple times --
> loading the same HFiles as distinct store files repeatedly.
> This leads to:
> * *Storefile size explosion*: the sink's store file size can grow to 3x or
> more of the source's size.
> * *Compaction storm*: the duplicated store files trigger massive compaction
> work.
> * *Replication lag*: the source accumulates WAL entries while the sink is
> overwhelmed.
> The root cause is that HBase bulkload replication has at-least-once
> semantics, but {{ReplicationSink}} has no deduplication mechanism to prevent
> concurrent reprocessing of the same bulkload event.
> h2. Evidence (observed in production)
> In a production bulkload replication setup, the following sequence was
> observed:
> * A single bulkload event on the source triggered repeated BulkloadRPC calls
> on the sink -- more invocations than the source fired -- confirming multiple
> retries of the same WAL entry.
> * Sink RS CPU spiked from baseline ~2% to over 40%.
> * Sink network inbound peaked at several hundred MB/s per node.
> * Sink cluster storefile size grew to over 3x the source within ~90 minutes.
> * Compaction queue on the sink peaked at tens of thousands, requiring hours
> of major compaction to recover.
> * Source replication lag peaked at ~70 minutes, with WAL queue growing to
> tens of files per RS.
> h2. Root Cause Analysis
> The execution path is:
> {code}
> ReplicationSourceShipper.shipEdits()
> -> HBaseInterClusterReplicationEndpoint.replicate() [RPC with
> shipEditsTimeout]
> -> ReplicationSink.replicateEntries()
> -> HFileReplicator.replicate()
> copyHFilesToStagingDir() <- copies HFiles from source HDFS
> (slow)
> LoadIncrementalHFiles.load() <- bulk loads into sink table
> {code}
> {{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying
> large HFiles takes longer than {{shipEditsTimeout}}, the source retries with
> the same WAL entry. {{HFileReplicator}} creates a new random staging
> directory each time, so there is no detection of an in-progress or
> already-completed invocation.
> Additionally, there is no rate-limiting mechanism for the HFile copy
> bandwidth, meaning a single large bulkload can saturate the sink cluster's
> network and cause cascading degradation.
> h2. Fix
> This patch addresses both issues:
> *1. Deduplication of concurrent bulkload replication*
> {{ReplicationSink}} tracks in-progress bulkload events in a
> {{ConcurrentHashSet}}. The key is:
> {code}
> replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
> {code}
> If a retry arrives while the same event is already being processed, it is
> skipped with a WARN log. The key is removed in a {{finally}} block so that
> subsequent retries (after the previous attempt completes or fails) are
> processed normally.
> Note: This prevents duplicate processing under concurrent retry scenarios. It
> does not change the at-least-once delivery guarantee of WAL replication.
> *2. Configurable bandwidth throttling for HFile copy*
> Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s,
> default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy
> threads within a single {{HFileReplicator}} invocation, so the configured
> bandwidth is a per-RS ceiling regardless of
> {{hbase.replication.bulkload.copy.maxthreads}}.
> {{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit
> can be updated dynamically via configuration reload without restarting
> RegionServers.
> h2. Steps to Reproduce
> # Set up bulkload replication: cluster A replicates to cluster B.
> # Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or
> ensure HFile copy takes longer than {{replication.source.shipedits.timeout}}).
> # Perform a large bulkload on cluster A.
> # Observe that cluster B processes the same bulkload WAL entry multiple
> times, resulting in duplicated store files.
> h2. Expected Behavior
> Each unique bulkload WAL entry (identified by {{replicationClusterId}},
> {{encodedRegionName}}, {{bulkloadSeqNum}}) should be processed at most once
> concurrently at the sink, even when the source retries due to RPC timeout.
> h2. Actual Behavior
> The same bulkload WAL entry is processed multiple times, creating duplicate
> store files and triggering compaction storm.
> h2. Additional Notes
> * Increasing {{replication.source.shipedits.timeout}} reduces the probability
> of retry but does not eliminate it and introduces WAL retention overhead.
> * The deduplication fix and the bandwidth throttling fix are complementary:
> throttling prevents network saturation that causes the timeout in the first
> place, while deduplication prevents the cascading failure when timeout does
> occur.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)