JeongMin Ju created HBASE-30238:
-----------------------------------
Summary: HBase bulkload replication causes duplicate HFile loading
and compaction storm when source RPC times out
Key: HBASE-30238
URL: https://issues.apache.org/jira/browse/HBASE-30238
Project: HBase
Issue Type: Bug
Components: Replication
Affects Versions: 2.5.15, 2.6.6
Reporter: JeongMin Ju
### Problem
When bulkload replication is enabled (A → B), the sink cluster (B) copies HFiles
from the source cluster's (A) HDFS and then performs a bulk load. If this
operation
takes longer than `replication.source.shipedits.timeout` (default 60s), the
source
retries the same WAL entry. Since `HFileReplicator` creates a new random staging
directory on every invocation, the sink processes the same bulkload event
multiple
times — loading the same HFiles as distinct store files repeatedly.
This leads to:
- **Storefile size explosion**: the sink's store file size can grow to 3x or
more of
the source's size.
- **Compaction storm**: the duplicated store files trigger massive compaction
work.
- **Replication lag**: the source accumulates WAL entries while the sink is
overwhelmed.
The root cause is that HBase bulkload replication has at-least-once semantics,
but
`ReplicationSink` has no deduplication mechanism to prevent concurrent
reprocessing
of the same bulkload event.
### Evidence (observed in production)
In a production bulkload replication setup, the following sequence was observed:
- A single bulkload event on the source triggered repeated BulkloadRPC calls on
the
sink — **more invocations than the source fired** — confirming multiple
retries of
the same WAL entry.
- Sink RS CPU spiked from baseline ~2% to over **40%**.
- Sink network inbound peaked at several hundred MB/s per node.
- Sink cluster storefile size grew to **over 3x** the source within ~90 minutes.
- Compaction queue on the sink peaked at **tens of thousands**, requiring hours
of
major compaction to recover.
- Source replication lag peaked at **~70 minutes**, with WAL queue growing to
tens
of files per RS.
### Root Cause Analysis
The execution path is:
```
ReplicationSourceShipper.shipEdits()
→ HBaseInterClusterReplicationEndpoint.replicate() [RPC with
shipEditsTimeout]
→ ReplicationSink.replicateEntries()
→ HFileReplicator.replicate()
copyHFilesToStagingDir() ← copies HFiles from source HDFS (slow)
LoadIncrementalHFiles.load() ← bulk loads into sink table
```
`copyHFilesToStagingDir()` runs inside the RPC handler thread. When copying
large
HFiles takes longer than `shipEditsTimeout`, the source retries with the **same
WAL entry**. `HFileReplicator` creates a new random staging directory each time,
so there is no detection of an in-progress or already-completed invocation.
Additionally, there is no rate-limiting mechanism for the HFile copy bandwidth,
meaning a single large bulkload can saturate the sink cluster's network and
cause
cascading degradation.
### Fix
This patch addresses both issues:
**1. Deduplication of concurrent bulkload replication**
`ReplicationSink` tracks in-progress bulkload events in a `ConcurrentHashSet`.
The key is:
```
replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
```
If a retry arrives while the same event is already being processed, it is
skipped
with a WARN log. The key is removed in a `finally` block so that subsequent
retries
(after the previous attempt completes or fails) are processed normally.
Note: This prevents duplicate processing under concurrent retry scenarios.
It does not change the at-least-once delivery guarantee of WAL replication.
**2. Configurable bandwidth throttling for HFile copy**
Introduces `hbase.replication.bulkload.copy.bandwidth.mb` (double, MB/s,
default 0 = unlimited). A Guava `RateLimiter` is shared across all copy threads
within a single `HFileReplicator` invocation, so the configured bandwidth is a
per-RS ceiling regardless of `hbase.replication.bulkload.copy.maxthreads`.
`ReplicationSink` implements `ConfigurationObserver` so the rate limit can be
updated dynamically via configuration reload without restarting RegionServers.
---
## Steps to Reproduce
1. Set up bulkload replication: cluster A replicates to cluster B.
2. Set `hbase.replication.bulkload.copy.maxthreads=1` on cluster B
(or ensure HFile copy takes longer than
`replication.source.shipedits.timeout`).
3. Perform a large bulkload on cluster A.
4. Observe that cluster B processes the same bulkload WAL entry multiple times,
resulting in duplicated store files.
---
## Expected Behavior
Each unique bulkload WAL entry (identified by `replicationClusterId`,
`encodedRegionName`, `bulkloadSeqNum`) should be processed at most once
concurrently at the sink, even when the source retries due to RPC timeout.
---
## Actual Behavior
The same bulkload WAL entry is processed multiple times, creating duplicate
store
files and triggering compaction storm.
---
## Additional Notes
- Increasing `replication.source.shipedits.timeout` reduces the probability of
retry but does not eliminate it and introduces WAL retention overhead.
- The deduplication fix and the bandwidth throttling fix are complementary:
throttling prevents network saturation that causes the timeout in the first
place,
while deduplication prevents the cascading failure when timeout does occur.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)