[
https://issues.apache.org/jira/browse/HBASE-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
JeongMin Ju updated HBASE-30238:
--------------------------------
Description:
h2. Problem
When bulkload replication is enabled (A to B), the sink cluster (B) copies
HFiles from the source cluster's (A) HDFS and then performs a bulk load. If
this operation takes longer than {{replication.source.shipedits.timeout}}
(default 60s), the source retries the same WAL entry. Since {{HFileReplicator}}
creates a new random staging directory on every invocation, the sink processes
the same bulkload event multiple times -- loading the same HFiles as distinct
store files repeatedly.
This leads to:
* *Storefile size explosion*: the sink's store file size can grow to 3x or more
of the source's size.
* *Compaction storm*: the duplicated store files trigger massive compaction
work.
* *Replication lag*: the source accumulates WAL entries while the sink is
overwhelmed.
The root cause is that HBase bulkload replication has at-least-once semantics,
but {{ReplicationSink}} has no deduplication mechanism to prevent concurrent
reprocessing of the same bulkload event.
h2. Evidence (observed in production)
In a production bulkload replication setup, the following sequence was observed:
* A single bulkload event on the source triggered repeated BulkloadRPC calls on
the sink -- more invocations than the source fired -- confirming multiple
retries of the same WAL entry.
* Sink RS CPU spiked from baseline ~2% to over 40%.
* Sink network inbound peaked at several hundred MB/s per node.
* Sink cluster storefile size grew to over 3x the source within ~90 minutes.
* Compaction queue on the sink peaked at tens of thousands, requiring hours of
major compaction to recover.
* Source replication lag peaked at ~70 minutes, with WAL queue growing to tens
of files per RS.
h2. Root Cause Analysis
The execution path is:
{code}
ReplicationSourceShipper.shipEdits()
-> HBaseInterClusterReplicationEndpoint.replicate() [RPC with
shipEditsTimeout]
-> ReplicationSink.replicateEntries()
-> HFileReplicator.replicate()
copyHFilesToStagingDir() <- copies HFiles from source HDFS (slow)
LoadIncrementalHFiles.load() <- bulk loads into sink table
{code}
{{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying
large HFiles takes longer than {{shipEditsTimeout}}, the source retries with
the same WAL entry. {{HFileReplicator}} creates a new random staging directory
each time, so there is no detection of an in-progress or already-completed
invocation.
Additionally, there is no rate-limiting mechanism for the HFile copy bandwidth,
meaning a single large bulkload can saturate the sink cluster's network and
cause cascading degradation.
h2. Fix
This patch addresses both issues:
*1. Deduplication of concurrent bulkload replication*
{{ReplicationSink}} tracks in-progress bulkload events in a
{{ConcurrentHashSet}}. The key is:
{code}
replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
{code}
If a retry arrives while the same event is already being processed, it is
skipped with a WARN log. The key is removed in a {{finally}} block so that
subsequent retries (after the previous attempt completes or fails) are
processed normally.
Note: This prevents duplicate processing under concurrent retry scenarios. It
does not change the at-least-once delivery guarantee of WAL replication.
*2. Configurable bandwidth throttling for HFile copy*
Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s,
default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy
threads within a single {{HFileReplicator}} invocation, so the configured
bandwidth is a per-RS ceiling regardless of
{{hbase.replication.bulkload.copy.maxthreads}}.
{{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit can
be updated dynamically via configuration reload without restarting
RegionServers.
h2. Steps to Reproduce
# Set up bulkload replication: cluster A replicates to cluster B.
# Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or ensure
HFile copy takes longer than {{replication.source.shipedits.timeout}}).
# Perform a large bulkload on cluster A.
# Observe that cluster B processes the same bulkload WAL entry multiple times,
resulting in duplicated store files.
h2. Expected Behavior
Each unique bulkload WAL entry (identified by {{replicationClusterId}},
{{encodedRegionName}}, {{bulkloadSeqNum}}) should be processed at most once
concurrently at the sink, even when the source retries due to RPC timeout.
h2. Actual Behavior
The same bulkload WAL entry is processed multiple times, creating duplicate
store files and triggering compaction storm.
h2. Additional Notes
* Increasing {{replication.source.shipedits.timeout}} reduces the probability
of retry but does not eliminate it and introduces WAL retention overhead.
* The deduplication fix and the bandwidth throttling fix are complementary:
throttling prevents network saturation that causes the timeout in the first
place, while deduplication prevents the cascading failure when timeout does
occur.
was:
### Problem
When bulkload replication is enabled (A → B), the sink cluster (B) copies HFiles
from the source cluster's (A) HDFS and then performs a bulk load. If this
operation
takes longer than `replication.source.shipedits.timeout` (default 60s), the
source
retries the same WAL entry. Since `HFileReplicator` creates a new random staging
directory on every invocation, the sink processes the same bulkload event
multiple
times — loading the same HFiles as distinct store files repeatedly.
This leads to:
- **Storefile size explosion**: the sink's store file size can grow to 3x or
more of
the source's size.
- **Compaction storm**: the duplicated store files trigger massive compaction
work.
- **Replication lag**: the source accumulates WAL entries while the sink is
overwhelmed.
The root cause is that HBase bulkload replication has at-least-once semantics,
but
`ReplicationSink` has no deduplication mechanism to prevent concurrent
reprocessing
of the same bulkload event.
### Evidence (observed in production)
In a production bulkload replication setup, the following sequence was observed:
- A single bulkload event on the source triggered repeated BulkloadRPC calls on
the
sink — **more invocations than the source fired** — confirming multiple
retries of
the same WAL entry.
- Sink RS CPU spiked from baseline ~2% to over **40%**.
- Sink network inbound peaked at several hundred MB/s per node.
- Sink cluster storefile size grew to **over 3x** the source within ~90 minutes.
- Compaction queue on the sink peaked at **tens of thousands**, requiring hours
of
major compaction to recover.
- Source replication lag peaked at **~70 minutes**, with WAL queue growing to
tens
of files per RS.
### Root Cause Analysis
The execution path is:
```
ReplicationSourceShipper.shipEdits()
→ HBaseInterClusterReplicationEndpoint.replicate() [RPC with
shipEditsTimeout]
→ ReplicationSink.replicateEntries()
→ HFileReplicator.replicate()
copyHFilesToStagingDir() ← copies HFiles from source HDFS (slow)
LoadIncrementalHFiles.load() ← bulk loads into sink table
```
`copyHFilesToStagingDir()` runs inside the RPC handler thread. When copying
large
HFiles takes longer than `shipEditsTimeout`, the source retries with the **same
WAL entry**. `HFileReplicator` creates a new random staging directory each time,
so there is no detection of an in-progress or already-completed invocation.
Additionally, there is no rate-limiting mechanism for the HFile copy bandwidth,
meaning a single large bulkload can saturate the sink cluster's network and
cause
cascading degradation.
### Fix
This patch addresses both issues:
**1. Deduplication of concurrent bulkload replication**
`ReplicationSink` tracks in-progress bulkload events in a `ConcurrentHashSet`.
The key is:
```
replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
```
If a retry arrives while the same event is already being processed, it is
skipped
with a WARN log. The key is removed in a `finally` block so that subsequent
retries
(after the previous attempt completes or fails) are processed normally.
Note: This prevents duplicate processing under concurrent retry scenarios.
It does not change the at-least-once delivery guarantee of WAL replication.
**2. Configurable bandwidth throttling for HFile copy**
Introduces `hbase.replication.bulkload.copy.bandwidth.mb` (double, MB/s,
default 0 = unlimited). A Guava `RateLimiter` is shared across all copy threads
within a single `HFileReplicator` invocation, so the configured bandwidth is a
per-RS ceiling regardless of `hbase.replication.bulkload.copy.maxthreads`.
`ReplicationSink` implements `ConfigurationObserver` so the rate limit can be
updated dynamically via configuration reload without restarting RegionServers.
---
## Steps to Reproduce
1. Set up bulkload replication: cluster A replicates to cluster B.
2. Set `hbase.replication.bulkload.copy.maxthreads=1` on cluster B
(or ensure HFile copy takes longer than
`replication.source.shipedits.timeout`).
3. Perform a large bulkload on cluster A.
4. Observe that cluster B processes the same bulkload WAL entry multiple times,
resulting in duplicated store files.
---
## Expected Behavior
Each unique bulkload WAL entry (identified by `replicationClusterId`,
`encodedRegionName`, `bulkloadSeqNum`) should be processed at most once
concurrently at the sink, even when the source retries due to RPC timeout.
---
## Actual Behavior
The same bulkload WAL entry is processed multiple times, creating duplicate
store
files and triggering compaction storm.
---
## Additional Notes
- Increasing `replication.source.shipedits.timeout` reduces the probability of
retry but does not eliminate it and introduces WAL retention overhead.
- The deduplication fix and the bandwidth throttling fix are complementary:
throttling prevents network saturation that causes the timeout in the first
place,
while deduplication prevents the cascading failure when timeout does occur.
> HBase bulkload replication causes duplicate HFile loading and compaction
> storm when source RPC times out
> --------------------------------------------------------------------------------------------------------
>
> Key: HBASE-30238
> URL: https://issues.apache.org/jira/browse/HBASE-30238
> Project: HBase
> Issue Type: Bug
> Components: Replication
> Affects Versions: 2.5.15, 2.6.6
> Reporter: JeongMin Ju
> Priority: Major
>
> h2. Problem
> When bulkload replication is enabled (A to B), the sink cluster (B) copies
> HFiles from the source cluster's (A) HDFS and then performs a bulk load. If
> this operation takes longer than {{replication.source.shipedits.timeout}}
> (default 60s), the source retries the same WAL entry. Since
> {{HFileReplicator}} creates a new random staging directory on every
> invocation, the sink processes the same bulkload event multiple times --
> loading the same HFiles as distinct store files repeatedly.
> This leads to:
> * *Storefile size explosion*: the sink's store file size can grow to 3x or
> more of the source's size.
> * *Compaction storm*: the duplicated store files trigger massive compaction
> work.
> * *Replication lag*: the source accumulates WAL entries while the sink is
> overwhelmed.
> The root cause is that HBase bulkload replication has at-least-once
> semantics, but {{ReplicationSink}} has no deduplication mechanism to prevent
> concurrent reprocessing of the same bulkload event.
> h2. Evidence (observed in production)
> In a production bulkload replication setup, the following sequence was
> observed:
> * A single bulkload event on the source triggered repeated BulkloadRPC calls
> on the sink -- more invocations than the source fired -- confirming multiple
> retries of the same WAL entry.
> * Sink RS CPU spiked from baseline ~2% to over 40%.
> * Sink network inbound peaked at several hundred MB/s per node.
> * Sink cluster storefile size grew to over 3x the source within ~90 minutes.
> * Compaction queue on the sink peaked at tens of thousands, requiring hours
> of major compaction to recover.
> * Source replication lag peaked at ~70 minutes, with WAL queue growing to
> tens of files per RS.
> h2. Root Cause Analysis
> The execution path is:
> {code}
> ReplicationSourceShipper.shipEdits()
> -> HBaseInterClusterReplicationEndpoint.replicate() [RPC with
> shipEditsTimeout]
> -> ReplicationSink.replicateEntries()
> -> HFileReplicator.replicate()
> copyHFilesToStagingDir() <- copies HFiles from source HDFS
> (slow)
> LoadIncrementalHFiles.load() <- bulk loads into sink table
> {code}
> {{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying
> large HFiles takes longer than {{shipEditsTimeout}}, the source retries with
> the same WAL entry. {{HFileReplicator}} creates a new random staging
> directory each time, so there is no detection of an in-progress or
> already-completed invocation.
> Additionally, there is no rate-limiting mechanism for the HFile copy
> bandwidth, meaning a single large bulkload can saturate the sink cluster's
> network and cause cascading degradation.
> h2. Fix
> This patch addresses both issues:
> *1. Deduplication of concurrent bulkload replication*
> {{ReplicationSink}} tracks in-progress bulkload events in a
> {{ConcurrentHashSet}}. The key is:
> {code}
> replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
> {code}
> If a retry arrives while the same event is already being processed, it is
> skipped with a WARN log. The key is removed in a {{finally}} block so that
> subsequent retries (after the previous attempt completes or fails) are
> processed normally.
> Note: This prevents duplicate processing under concurrent retry scenarios. It
> does not change the at-least-once delivery guarantee of WAL replication.
> *2. Configurable bandwidth throttling for HFile copy*
> Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s,
> default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy
> threads within a single {{HFileReplicator}} invocation, so the configured
> bandwidth is a per-RS ceiling regardless of
> {{hbase.replication.bulkload.copy.maxthreads}}.
> {{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit
> can be updated dynamically via configuration reload without restarting
> RegionServers.
> h2. Steps to Reproduce
> # Set up bulkload replication: cluster A replicates to cluster B.
> # Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or
> ensure HFile copy takes longer than {{replication.source.shipedits.timeout}}).
> # Perform a large bulkload on cluster A.
> # Observe that cluster B processes the same bulkload WAL entry multiple
> times, resulting in duplicated store files.
> h2. Expected Behavior
> Each unique bulkload WAL entry (identified by {{replicationClusterId}},
> {{encodedRegionName}}, {{bulkloadSeqNum}}) should be processed at most once
> concurrently at the sink, even when the source retries due to RPC timeout.
> h2. Actual Behavior
> The same bulkload WAL entry is processed multiple times, creating duplicate
> store files and triggering compaction storm.
> h2. Additional Notes
> * Increasing {{replication.source.shipedits.timeout}} reduces the probability
> of retry but does not eliminate it and introduces WAL retention overhead.
> * The deduplication fix and the bandwidth throttling fix are complementary:
> throttling prevents network saturation that causes the timeout in the first
> place, while deduplication prevents the cascading failure when timeout does
> occur.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)