JeongMin Ju created HBASE-30238:
-----------------------------------

             Summary: HBase bulkload replication causes duplicate HFile loading 
and compaction storm when source RPC times out
                 Key: HBASE-30238
                 URL: https://issues.apache.org/jira/browse/HBASE-30238
             Project: HBase
          Issue Type: Bug
          Components: Replication
    Affects Versions: 2.5.15, 2.6.6
            Reporter: JeongMin Ju



### Problem

When bulkload replication is enabled (A → B), the sink cluster (B) copies HFiles
from the source cluster's (A) HDFS and then performs a bulk load. If this 
operation
takes longer than `replication.source.shipedits.timeout` (default 60s), the 
source
retries the same WAL entry. Since `HFileReplicator` creates a new random staging
directory on every invocation, the sink processes the same bulkload event 
multiple
times — loading the same HFiles as distinct store files repeatedly.

This leads to:
- **Storefile size explosion**: the sink's store file size can grow to 3x or 
more of
  the source's size.
- **Compaction storm**: the duplicated store files trigger massive compaction 
work.
- **Replication lag**: the source accumulates WAL entries while the sink is 
overwhelmed.

The root cause is that HBase bulkload replication has at-least-once semantics, 
but
`ReplicationSink` has no deduplication mechanism to prevent concurrent 
reprocessing
of the same bulkload event.

### Evidence (observed in production)

In a production bulkload replication setup, the following sequence was observed:

- A single bulkload event on the source triggered repeated BulkloadRPC calls on 
the
  sink — **more invocations than the source fired** — confirming multiple 
retries of
  the same WAL entry.
- Sink RS CPU spiked from baseline ~2% to over **40%**.
- Sink network inbound peaked at several hundred MB/s per node.
- Sink cluster storefile size grew to **over 3x** the source within ~90 minutes.
- Compaction queue on the sink peaked at **tens of thousands**, requiring hours 
of
  major compaction to recover.
- Source replication lag peaked at **~70 minutes**, with WAL queue growing to 
tens
  of files per RS.

### Root Cause Analysis

The execution path is:

```
ReplicationSourceShipper.shipEdits()
  → HBaseInterClusterReplicationEndpoint.replicate()   [RPC with 
shipEditsTimeout]
    → ReplicationSink.replicateEntries()
      → HFileReplicator.replicate()
          copyHFilesToStagingDir()      ← copies HFiles from source HDFS (slow)
          LoadIncrementalHFiles.load()  ← bulk loads into sink table
```

`copyHFilesToStagingDir()` runs inside the RPC handler thread. When copying 
large
HFiles takes longer than `shipEditsTimeout`, the source retries with the **same
WAL entry**. `HFileReplicator` creates a new random staging directory each time,
so there is no detection of an in-progress or already-completed invocation.

Additionally, there is no rate-limiting mechanism for the HFile copy bandwidth,
meaning a single large bulkload can saturate the sink cluster's network and 
cause
cascading degradation.

### Fix

This patch addresses both issues:

**1. Deduplication of concurrent bulkload replication**

`ReplicationSink` tracks in-progress bulkload events in a `ConcurrentHashSet`.
The key is:

```
replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
```

If a retry arrives while the same event is already being processed, it is 
skipped
with a WARN log. The key is removed in a `finally` block so that subsequent 
retries
(after the previous attempt completes or fails) are processed normally.

Note: This prevents duplicate processing under concurrent retry scenarios.
It does not change the at-least-once delivery guarantee of WAL replication.

**2. Configurable bandwidth throttling for HFile copy**

Introduces `hbase.replication.bulkload.copy.bandwidth.mb` (double, MB/s,
default 0 = unlimited). A Guava `RateLimiter` is shared across all copy threads
within a single `HFileReplicator` invocation, so the configured bandwidth is a
per-RS ceiling regardless of `hbase.replication.bulkload.copy.maxthreads`.

`ReplicationSink` implements `ConfigurationObserver` so the rate limit can be
updated dynamically via configuration reload without restarting RegionServers.

---

## Steps to Reproduce

1. Set up bulkload replication: cluster A replicates to cluster B.
2. Set `hbase.replication.bulkload.copy.maxthreads=1` on cluster B
   (or ensure HFile copy takes longer than 
`replication.source.shipedits.timeout`).
3. Perform a large bulkload on cluster A.
4. Observe that cluster B processes the same bulkload WAL entry multiple times,
   resulting in duplicated store files.

---

## Expected Behavior

Each unique bulkload WAL entry (identified by `replicationClusterId`,
`encodedRegionName`, `bulkloadSeqNum`) should be processed at most once
concurrently at the sink, even when the source retries due to RPC timeout.

---

## Actual Behavior

The same bulkload WAL entry is processed multiple times, creating duplicate 
store
files and triggering compaction storm.

---

## Additional Notes

- Increasing `replication.source.shipedits.timeout` reduces the probability of
  retry but does not eliminate it and introduces WAL retention overhead.
- The deduplication fix and the bandwidth throttling fix are complementary:
  throttling prevents network saturation that causes the timeout in the first 
place,
  while deduplication prevents the cascading failure when timeout does occur.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to