Nikita Awasthi created HBASE-30237:
--------------------------------------

             Summary: HBase bulkload replication causes duplicate HFile loading 
and compaction storm when source RPC times out
                 Key: HBASE-30237
                 URL: https://issues.apache.org/jira/browse/HBASE-30237
             Project: HBase
          Issue Type: Bug
          Components: Replication
    Affects Versions: 2.5.15, 2.6.6
            Reporter: Nikita Awasthi


h2. Problem

When bulkload replication is enabled (A to B), the sink cluster (B) copies 
HFiles from the source cluster's (A) HDFS and then performs a bulk load. If 
this operation takes longer than {{replication.source.shipedits.timeout}} 
(default 60s), the source retries the same WAL entry. Since {{HFileReplicator}} 
creates a new random staging directory on every invocation, the sink processes 
the same bulkload event multiple times -- loading the same HFiles as distinct 
store files repeatedly.

This leads to:
* *Storefile size explosion*: the sink's store file size can grow to 3x or more 
of the source's size.
* *Compaction storm*: the duplicated store files trigger massive compaction 
work.
* *Replication lag*: the source accumulates WAL entries while the sink is 
overwhelmed.

The root cause is that HBase bulkload replication has at-least-once semantics, 
but {{ReplicationSink}} has no deduplication mechanism to prevent concurrent 
reprocessing of the same bulkload event.

h2. Evidence (observed in production)

In a production bulkload replication setup, the following sequence was observed:

* A single bulkload event on the source triggered repeated BulkloadRPC calls on 
the sink -- more invocations than the source fired -- confirming multiple 
retries of the same WAL entry.
* Sink RS CPU spiked from baseline ~2% to over 40%.
* Sink network inbound peaked at several hundred MB/s per node.
* Sink cluster storefile size grew to over 3x the source within ~90 minutes.
* Compaction queue on the sink peaked at tens of thousands, requiring hours of 
major compaction to recover.
* Source replication lag peaked at ~70 minutes, with WAL queue growing to tens 
of files per RS.

h2. Root Cause Analysis

The execution path is:

{code}
ReplicationSourceShipper.shipEdits()
  -> HBaseInterClusterReplicationEndpoint.replicate()   [RPC with 
shipEditsTimeout]
    -> ReplicationSink.replicateEntries()
      -> HFileReplicator.replicate()
          copyHFilesToStagingDir()      <- copies HFiles from source HDFS (slow)
          LoadIncrementalHFiles.load()  <- bulk loads into sink table
{code}

{{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying 
large HFiles takes longer than {{shipEditsTimeout}}, the source retries with 
the same WAL entry. {{HFileReplicator}} creates a new random staging directory 
each time, so there is no detection of an in-progress or already-completed 
invocation.

Additionally, there is no rate-limiting mechanism for the HFile copy bandwidth, 
meaning a single large bulkload can saturate the sink cluster's network and 
cause cascading degradation.

h2. Fix

This patch addresses both issues:

*1. Deduplication of concurrent bulkload replication*

{{ReplicationSink}} tracks in-progress bulkload events in a 
{{ConcurrentHashSet}}. The key is:

{code}
replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
{code}

If a retry arrives while the same event is already being processed, it is 
skipped with a WARN log. The key is removed in a {{finally}} block so that 
subsequent retries (after the previous attempt completes or fails) are 
processed normally.

Note: This prevents duplicate processing under concurrent retry scenarios. It 
does not change the at-least-once delivery guarantee of WAL replication.

*2. Configurable bandwidth throttling for HFile copy*

Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s, 
default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy 
threads within a single {{HFileReplicator}} invocation, so the configured 
bandwidth is a per-RS ceiling regardless of 
{{hbase.replication.bulkload.copy.maxthreads}}.

{{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit can 
be updated dynamically via configuration reload without restarting 
RegionServers.

h2. Steps to Reproduce

# Set up bulkload replication: cluster A replicates to cluster B.
# Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or ensure 
HFile copy takes longer than {{replication.source.shipedits.timeout}}).
# Perform a large bulkload on cluster A.
# Observe that cluster B processes the same bulkload WAL entry multiple times, 
resulting in duplicated store files.

h2. Expected Behavior

Each unique bulkload WAL entry (identified by {{replicationClusterId}}, 
{{encodedRegionName}}, {{bulkloadSeqNum}}) should be processed at most once 
concurrently at the sink, even when the source retries due to RPC timeout.

h2. Actual Behavior

The same bulkload WAL entry is processed multiple times, creating duplicate 
store files and triggering compaction storm.

h2. Additional Notes

* Increasing {{replication.source.shipedits.timeout}} reduces the probability 
of retry but does not eliminate it and introduces WAL retention overhead.
* The deduplication fix and the bandwidth throttling fix are complementary: 
throttling prevents network saturation that causes the timeout in the first 
place, while deduplication prevents the cascading failure when timeout does 
occur.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to