[ 
https://issues.apache.org/jira/browse/HBASE-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JeongMin Ju updated HBASE-30238:
--------------------------------
    Description: 
h2. Problem

When bulkload replication is enabled (A to B), the sink cluster (B) copies 
HFiles from the source cluster's (A) HDFS and then performs a bulk load. If 
this operation takes longer than {{replication.source.shipedits.timeout}} 
(default 60s), the source retries the same WAL entry. Since {{HFileReplicator}} 
creates a new random staging directory on every invocation, the sink processes 
the same bulkload event multiple times -- loading the same HFiles as distinct 
store files repeatedly.

This leads to:
* *Storefile size explosion*: the sink's store file size can grow to 3x or more 
of the source's size.
* *Compaction storm*: the duplicated store files trigger massive compaction 
work.
* *Replication lag*: the source accumulates WAL entries while the sink is 
overwhelmed.

The root cause is that HBase bulkload replication has at-least-once semantics, 
but {{ReplicationSink}} has no deduplication mechanism to prevent concurrent 
reprocessing of the same bulkload event.

h2. Evidence (observed in production)

In a production bulkload replication setup, the following sequence was observed:

* A single bulkload event on the source triggered repeated BulkloadRPC calls on 
the sink -- more invocations than the source fired -- confirming multiple 
retries of the same WAL entry.
* Sink RS CPU spiked from baseline ~2% to over 40%.
* Sink network inbound peaked at several hundred MB/s per node.
* Sink cluster storefile size grew to over 3x the source within ~90 minutes.
* Compaction queue on the sink peaked at tens of thousands, requiring hours of 
major compaction to recover.
* Source replication lag peaked at ~70 minutes, with WAL queue growing to tens 
of files per RS.

h2. Root Cause Analysis

The execution path is:

{code}
ReplicationSourceShipper.shipEdits()
  -> HBaseInterClusterReplicationEndpoint.replicate()   [RPC with 
shipEditsTimeout]
    -> ReplicationSink.replicateEntries()
      -> HFileReplicator.replicate()
          copyHFilesToStagingDir()      <- copies HFiles from source HDFS (slow)
          LoadIncrementalHFiles.load()  <- bulk loads into sink table
{code}

{{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying 
large HFiles takes longer than {{shipEditsTimeout}}, the source retries with 
the same WAL entry. {{HFileReplicator}} creates a new random staging directory 
each time, so there is no detection of an in-progress or already-completed 
invocation.

Additionally, there is no rate-limiting mechanism for the HFile copy bandwidth, 
meaning a single large bulkload can saturate the sink cluster's network and 
cause cascading degradation.

h2. Fix

This patch addresses both issues:

*1. Deduplication of concurrent bulkload replication*

{{ReplicationSink}} tracks in-progress bulkload events in a 
{{ConcurrentHashSet}}. The key is:

{code}
replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
{code}

If a retry arrives while the same event is already being processed, it is 
skipped with a WARN log. The key is removed in a {{finally}} block so that 
subsequent retries (after the previous attempt completes or fails) are 
processed normally.

Note: This prevents duplicate processing under concurrent retry scenarios. It 
does not change the at-least-once delivery guarantee of WAL replication.

*2. Configurable bandwidth throttling for HFile copy*

Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s, 
default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy 
threads within a single {{HFileReplicator}} invocation, so the configured 
bandwidth is a per-RS ceiling regardless of 
{{hbase.replication.bulkload.copy.maxthreads}}.

{{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit can 
be updated dynamically via configuration reload without restarting 
RegionServers.

h2. Steps to Reproduce

# Set up bulkload replication: cluster A replicates to cluster B.
# Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or ensure 
HFile copy takes longer than {{replication.source.shipedits.timeout}}).
# Perform a large bulkload on cluster A.
# Observe that cluster B processes the same bulkload WAL entry multiple times, 
resulting in duplicated store files.

h2. Expected Behavior

Each unique bulkload WAL entry (identified by {{replicationClusterId}}, 
{{encodedRegionName}}, {{bulkloadSeqNum}}) should be processed at most once 
concurrently at the sink, even when the source retries due to RPC timeout.

h2. Actual Behavior

The same bulkload WAL entry is processed multiple times, creating duplicate 
store files and triggering compaction storm.

h2. Additional Notes

* Increasing {{replication.source.shipedits.timeout}} reduces the probability 
of retry but does not eliminate it and introduces WAL retention overhead.
* The deduplication fix and the bandwidth throttling fix are complementary: 
throttling prevents network saturation that causes the timeout in the first 
place, while deduplication prevents the cascading failure when timeout does 
occur.


  was:

### Problem

When bulkload replication is enabled (A → B), the sink cluster (B) copies HFiles
from the source cluster's (A) HDFS and then performs a bulk load. If this 
operation
takes longer than `replication.source.shipedits.timeout` (default 60s), the 
source
retries the same WAL entry. Since `HFileReplicator` creates a new random staging
directory on every invocation, the sink processes the same bulkload event 
multiple
times — loading the same HFiles as distinct store files repeatedly.

This leads to:
- **Storefile size explosion**: the sink's store file size can grow to 3x or 
more of
  the source's size.
- **Compaction storm**: the duplicated store files trigger massive compaction 
work.
- **Replication lag**: the source accumulates WAL entries while the sink is 
overwhelmed.

The root cause is that HBase bulkload replication has at-least-once semantics, 
but
`ReplicationSink` has no deduplication mechanism to prevent concurrent 
reprocessing
of the same bulkload event.

### Evidence (observed in production)

In a production bulkload replication setup, the following sequence was observed:

- A single bulkload event on the source triggered repeated BulkloadRPC calls on 
the
  sink — **more invocations than the source fired** — confirming multiple 
retries of
  the same WAL entry.
- Sink RS CPU spiked from baseline ~2% to over **40%**.
- Sink network inbound peaked at several hundred MB/s per node.
- Sink cluster storefile size grew to **over 3x** the source within ~90 minutes.
- Compaction queue on the sink peaked at **tens of thousands**, requiring hours 
of
  major compaction to recover.
- Source replication lag peaked at **~70 minutes**, with WAL queue growing to 
tens
  of files per RS.

### Root Cause Analysis

The execution path is:

```
ReplicationSourceShipper.shipEdits()
  → HBaseInterClusterReplicationEndpoint.replicate()   [RPC with 
shipEditsTimeout]
    → ReplicationSink.replicateEntries()
      → HFileReplicator.replicate()
          copyHFilesToStagingDir()      ← copies HFiles from source HDFS (slow)
          LoadIncrementalHFiles.load()  ← bulk loads into sink table
```

`copyHFilesToStagingDir()` runs inside the RPC handler thread. When copying 
large
HFiles takes longer than `shipEditsTimeout`, the source retries with the **same
WAL entry**. `HFileReplicator` creates a new random staging directory each time,
so there is no detection of an in-progress or already-completed invocation.

Additionally, there is no rate-limiting mechanism for the HFile copy bandwidth,
meaning a single large bulkload can saturate the sink cluster's network and 
cause
cascading degradation.

### Fix

This patch addresses both issues:

**1. Deduplication of concurrent bulkload replication**

`ReplicationSink` tracks in-progress bulkload events in a `ConcurrentHashSet`.
The key is:

```
replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
```

If a retry arrives while the same event is already being processed, it is 
skipped
with a WARN log. The key is removed in a `finally` block so that subsequent 
retries
(after the previous attempt completes or fails) are processed normally.

Note: This prevents duplicate processing under concurrent retry scenarios.
It does not change the at-least-once delivery guarantee of WAL replication.

**2. Configurable bandwidth throttling for HFile copy**

Introduces `hbase.replication.bulkload.copy.bandwidth.mb` (double, MB/s,
default 0 = unlimited). A Guava `RateLimiter` is shared across all copy threads
within a single `HFileReplicator` invocation, so the configured bandwidth is a
per-RS ceiling regardless of `hbase.replication.bulkload.copy.maxthreads`.

`ReplicationSink` implements `ConfigurationObserver` so the rate limit can be
updated dynamically via configuration reload without restarting RegionServers.

---

## Steps to Reproduce

1. Set up bulkload replication: cluster A replicates to cluster B.
2. Set `hbase.replication.bulkload.copy.maxthreads=1` on cluster B
   (or ensure HFile copy takes longer than 
`replication.source.shipedits.timeout`).
3. Perform a large bulkload on cluster A.
4. Observe that cluster B processes the same bulkload WAL entry multiple times,
   resulting in duplicated store files.

---

## Expected Behavior

Each unique bulkload WAL entry (identified by `replicationClusterId`,
`encodedRegionName`, `bulkloadSeqNum`) should be processed at most once
concurrently at the sink, even when the source retries due to RPC timeout.

---

## Actual Behavior

The same bulkload WAL entry is processed multiple times, creating duplicate 
store
files and triggering compaction storm.

---

## Additional Notes

- Increasing `replication.source.shipedits.timeout` reduces the probability of
  retry but does not eliminate it and introduces WAL retention overhead.
- The deduplication fix and the bandwidth throttling fix are complementary:
  throttling prevents network saturation that causes the timeout in the first 
place,
  while deduplication prevents the cascading failure when timeout does occur.



> HBase bulkload replication causes duplicate HFile loading and compaction 
> storm when source RPC times out
> --------------------------------------------------------------------------------------------------------
>
>                 Key: HBASE-30238
>                 URL: https://issues.apache.org/jira/browse/HBASE-30238
>             Project: HBase
>          Issue Type: Bug
>          Components: Replication
>    Affects Versions: 2.5.15, 2.6.6
>            Reporter: JeongMin Ju
>            Priority: Major
>
> h2. Problem
> When bulkload replication is enabled (A to B), the sink cluster (B) copies 
> HFiles from the source cluster's (A) HDFS and then performs a bulk load. If 
> this operation takes longer than {{replication.source.shipedits.timeout}} 
> (default 60s), the source retries the same WAL entry. Since 
> {{HFileReplicator}} creates a new random staging directory on every 
> invocation, the sink processes the same bulkload event multiple times -- 
> loading the same HFiles as distinct store files repeatedly.
> This leads to:
> * *Storefile size explosion*: the sink's store file size can grow to 3x or 
> more of the source's size.
> * *Compaction storm*: the duplicated store files trigger massive compaction 
> work.
> * *Replication lag*: the source accumulates WAL entries while the sink is 
> overwhelmed.
> The root cause is that HBase bulkload replication has at-least-once 
> semantics, but {{ReplicationSink}} has no deduplication mechanism to prevent 
> concurrent reprocessing of the same bulkload event.
> h2. Evidence (observed in production)
> In a production bulkload replication setup, the following sequence was 
> observed:
> * A single bulkload event on the source triggered repeated BulkloadRPC calls 
> on the sink -- more invocations than the source fired -- confirming multiple 
> retries of the same WAL entry.
> * Sink RS CPU spiked from baseline ~2% to over 40%.
> * Sink network inbound peaked at several hundred MB/s per node.
> * Sink cluster storefile size grew to over 3x the source within ~90 minutes.
> * Compaction queue on the sink peaked at tens of thousands, requiring hours 
> of major compaction to recover.
> * Source replication lag peaked at ~70 minutes, with WAL queue growing to 
> tens of files per RS.
> h2. Root Cause Analysis
> The execution path is:
> {code}
> ReplicationSourceShipper.shipEdits()
>   -> HBaseInterClusterReplicationEndpoint.replicate()   [RPC with 
> shipEditsTimeout]
>     -> ReplicationSink.replicateEntries()
>       -> HFileReplicator.replicate()
>           copyHFilesToStagingDir()      <- copies HFiles from source HDFS 
> (slow)
>           LoadIncrementalHFiles.load()  <- bulk loads into sink table
> {code}
> {{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying 
> large HFiles takes longer than {{shipEditsTimeout}}, the source retries with 
> the same WAL entry. {{HFileReplicator}} creates a new random staging 
> directory each time, so there is no detection of an in-progress or 
> already-completed invocation.
> Additionally, there is no rate-limiting mechanism for the HFile copy 
> bandwidth, meaning a single large bulkload can saturate the sink cluster's 
> network and cause cascading degradation.
> h2. Fix
> This patch addresses both issues:
> *1. Deduplication of concurrent bulkload replication*
> {{ReplicationSink}} tracks in-progress bulkload events in a 
> {{ConcurrentHashSet}}. The key is:
> {code}
> replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
> {code}
> If a retry arrives while the same event is already being processed, it is 
> skipped with a WARN log. The key is removed in a {{finally}} block so that 
> subsequent retries (after the previous attempt completes or fails) are 
> processed normally.
> Note: This prevents duplicate processing under concurrent retry scenarios. It 
> does not change the at-least-once delivery guarantee of WAL replication.
> *2. Configurable bandwidth throttling for HFile copy*
> Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s, 
> default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy 
> threads within a single {{HFileReplicator}} invocation, so the configured 
> bandwidth is a per-RS ceiling regardless of 
> {{hbase.replication.bulkload.copy.maxthreads}}.
> {{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit 
> can be updated dynamically via configuration reload without restarting 
> RegionServers.
> h2. Steps to Reproduce
> # Set up bulkload replication: cluster A replicates to cluster B.
> # Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or 
> ensure HFile copy takes longer than {{replication.source.shipedits.timeout}}).
> # Perform a large bulkload on cluster A.
> # Observe that cluster B processes the same bulkload WAL entry multiple 
> times, resulting in duplicated store files.
> h2. Expected Behavior
> Each unique bulkload WAL entry (identified by {{replicationClusterId}}, 
> {{encodedRegionName}}, {{bulkloadSeqNum}}) should be processed at most once 
> concurrently at the sink, even when the source retries due to RPC timeout.
> h2. Actual Behavior
> The same bulkload WAL entry is processed multiple times, creating duplicate 
> store files and triggering compaction storm.
> h2. Additional Notes
> * Increasing {{replication.source.shipedits.timeout}} reduces the probability 
> of retry but does not eliminate it and introduces WAL retention overhead.
> * The deduplication fix and the bandwidth throttling fix are complementary: 
> throttling prevents network saturation that causes the timeout in the first 
> place, while deduplication prevents the cascading failure when timeout does 
> occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to