[
https://issues.apache.org/jira/browse/HBASE-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
JeongMin Ju updated HBASE-30238:
--------------------------------
Description:
h2. Problem
When bulkload replication is enabled (A to B), the sink cluster (B) copies
HFiles from the source cluster's (A) HDFS and then performs a bulk load. If
this operation takes longer than {{replication.source.shipedits.timeout}}
(default 60s), the source retries the same WAL entry. Since {{HFileReplicator}}
creates a new random staging directory on every invocation, the sink can
process the same bulkload event multiple times -- loading the same HFiles as
distinct store files repeatedly.
This leads to:
* *Storefile size explosion*: the sink's store file size can grow to 3x or more
of the source's size.
* *Compaction storm*: the duplicated store files trigger massive compaction
work.
* *Replication lag*: the source accumulates WAL entries while the sink is
overwhelmed.
The root cause is that HBase bulkload replication has at-least-once delivery
semantics, but {{ReplicationSink}} historically had no event tracking mechanism
to prevent concurrent or completed reprocessing of the same bulkload WAL event.
h2. Evidence (observed in production)
In a production bulkload replication setup, the following sequence was observed:
* A single bulkload event on the source triggered repeated BulkloadRPC calls on
the sink -- more invocations than the source fired -- confirming multiple
retries of the same WAL entry.
* Sink RS CPU spiked from baseline ~2% to over 40%.
* Sink network inbound peaked at several hundred MB/s per node.
* Sink cluster storefile size grew to over 3x the source within ~90 minutes.
* Compaction queue on the sink peaked at tens of thousands, requiring hours of
major compaction to recover.
* Source replication lag peaked at ~70 minutes, with WAL queue growing to tens
of files per RS.
h2. Root Cause Analysis
The execution path is:
{code}
ReplicationSourceShipper.shipEdits()
-> HBaseInterClusterReplicationEndpoint.replicate() [RPC with
shipEditsTimeout]
-> ReplicationSink.replicateEntries()
-> HFileReplicator.replicate()
copyHFilesToStagingDir() <- copies HFiles from source HDFS (slow)
LoadIncrementalHFiles.load() <- bulk loads into sink table
{code}
{{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying
large HFiles takes longer than {{shipEditsTimeout}}, the source retries with
the same WAL entry. {{HFileReplicator}} creates a new random staging directory
each time, so there is no HFile-level detection of an in-progress or
already-completed invocation.
Retries may also be sent to a different target RegionServer. Therefore, a
per-{{ReplicationSink}} in-memory guard is not sufficient by itself: the event
state must be visible across all RegionServers in the sink cluster.
Additionally, there was no rate-limiting mechanism for the HFile copy
bandwidth, meaning a single large bulkload can saturate the sink cluster's
network and cause cascading degradation.
h2. Fix
This patch addresses the failure in three layers:
*1. Configurable bandwidth throttling for HFile copy*
Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s,
default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy
threads within a single {{HFileReplicator}} invocation, so the configured
bandwidth is a per-RS ceiling regardless of
{{hbase.replication.bulkload.copy.maxthreads}}.
{{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit can
be updated dynamically via configuration reload without restarting
RegionServers.
*2. Distributed bulkload event tracking across sink RegionServers*
{{ReplicationSink}} now coordinates replicated bulkload WAL events through
ZooKeeper. The RegionServer service path passes its {{ZKWatcher}} into
{{ReplicationSink}}, which creates a {{ReplicationBulkLoadEventTracker}}.
Each bulkload event is identified by:
{code}
replicationClusterId
table
encodedRegionName
bulkloadSeqNum
{code}
The event is stored under a bucket derived from the WAL write time, using two
ZooKeeper marker trees:
{code}
.../replication/bulkload-events/in-progress/<bucket>/<eventId>
.../replication/bulkload-events/done/<bucket>/<eventId>
{code}
The {{in-progress}} marker is ephemeral and represents the RegionServer
currently processing the event. The {{done}} marker is persistent and records
successful completion.
When a sink receives a bulkload WAL event:
# If a {{done}} marker already exists, the event is skipped.
# Otherwise, the sink attempts to create the {{in-progress}} marker.
# If another RegionServer is already processing the same event, the sink waits
for either completion or timeout.
# After {{HFileReplicator.replicate()}} succeeds, the sink writes the {{done}}
marker and releases the {{in-progress}} marker.
This prevents duplicate HFile loading when a source-side RPC retry is routed to
a different target RegionServer after the first target RegionServer already
completed the same bulkload event.
A local in-memory guard remains as a fallback for {{ReplicationSink}} instances
constructed without ZooKeeper, but the RegionServer replication service path
uses ZooKeeper-backed event tracking.
*3. Master-side cleanup for completed event markers*
Adds {{ReplicationBulkLoadEventCleaner}}, a master scheduled chore. It removes
expired {{done}} markers only when the matching {{in-progress}} marker no
longer exists, then removes empty bucket znodes.
The cleanup is controlled by:
{code}
hbase.master.cleaner.replication.bulkload.event.period.ms
hbase.replication.bulkload.event.done.ttl.ms
{code}
The default completed-marker TTL is one day.
h2. Test Coverage
The patch adds coverage for both the throttling and deduplication paths:
* {{TestHFileReplicatorBandwidth}} verifies bandwidth throttling and dynamic
rate-limit updates.
* {{TestReplicationSinkBulkLoadDedup}} verifies sink-level duplicate handling
and completed-marker skipping.
* {{TestReplicationBulkLoadEventTracker}} verifies ZooKeeper in-progress/done
marker behavior and stable event bucket/id generation.
* {{TestReplicationBulkLoadEventCleaner}} verifies master-side cleanup of
expired {{done}} markers while preserving markers with matching {{in-progress}}
state.
*
{{TestBulkLoadReplication#testDuplicateBulkLoadWalEventSkippedAcrossTargetRegionServers}}
starts MiniCluster RegionServers and replays the exact same bulkload WAL batch
first through one target RS sink and then through another target RS sink,
verifying that the second replay is skipped by the completed marker.
h2. Steps to Reproduce
# Set up bulkload replication: cluster A replicates to cluster B.
# Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or ensure
HFile copy takes longer than {{replication.source.shipedits.timeout}}).
# Perform a large bulkload on cluster A.
# Observe that cluster B may process the same bulkload WAL entry multiple
times, resulting in duplicated store files.
h2. Expected Behavior
Each unique replicated bulkload WAL event should load its HFiles at most once
in the sink cluster after successful completion, even when the source retries
due to RPC timeout and the retry is routed to another target RegionServer.
Failed attempts remain retryable; successful attempts are recorded with
completed markers until cleanup TTL expires.
h2. Actual Behavior
The same bulkload WAL entry can be processed multiple times, creating duplicate
store files and triggering compaction storm.
h2. Additional Notes
* Increasing {{replication.source.shipedits.timeout}} reduces the probability
of retry but does not eliminate it and introduces WAL retention overhead.
* The bandwidth throttling and ZooKeeper event tracking fixes are
complementary: throttling reduces the chance of network saturation and source
RPC timeout, while event tracking prevents duplicate bulkload execution when
timeout/retry still occurs.
* Pull request: https://github.com/apache/hbase/pull/8380
was:
h2. Problem
When bulkload replication is enabled (A to B), the sink cluster (B) copies
HFiles from the source cluster's (A) HDFS and then performs a bulk load. If
this operation takes longer than {{replication.source.shipedits.timeout}}
(default 60s), the source retries the same WAL entry. Since {{HFileReplicator}}
creates a new random staging directory on every invocation, the sink processes
the same bulkload event multiple times -- loading the same HFiles as distinct
store files repeatedly.
This leads to:
* *Storefile size explosion*: the sink's store file size can grow to 3x or more
of the source's size.
* *Compaction storm*: the duplicated store files trigger massive compaction
work.
* *Replication lag*: the source accumulates WAL entries while the sink is
overwhelmed.
The root cause is that HBase bulkload replication has at-least-once semantics,
but {{ReplicationSink}} has no deduplication mechanism to prevent concurrent
reprocessing of the same bulkload event.
h2. Evidence (observed in production)
In a production bulkload replication setup, the following sequence was observed:
* A single bulkload event on the source triggered repeated BulkloadRPC calls on
the sink -- more invocations than the source fired -- confirming multiple
retries of the same WAL entry.
* Sink RS CPU spiked from baseline ~2% to over 40%.
* Sink network inbound peaked at several hundred MB/s per node.
* Sink cluster storefile size grew to over 3x the source within ~90 minutes.
* Compaction queue on the sink peaked at tens of thousands, requiring hours of
major compaction to recover.
* Source replication lag peaked at ~70 minutes, with WAL queue growing to tens
of files per RS.
h2. Root Cause Analysis
The execution path is:
{code}
ReplicationSourceShipper.shipEdits()
-> HBaseInterClusterReplicationEndpoint.replicate() [RPC with
shipEditsTimeout]
-> ReplicationSink.replicateEntries()
-> HFileReplicator.replicate()
copyHFilesToStagingDir() <- copies HFiles from source HDFS (slow)
LoadIncrementalHFiles.load() <- bulk loads into sink table
{code}
{{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying
large HFiles takes longer than {{shipEditsTimeout}}, the source retries with
the same WAL entry. {{HFileReplicator}} creates a new random staging directory
each time, so there is no detection of an in-progress or already-completed
invocation.
Additionally, there is no rate-limiting mechanism for the HFile copy bandwidth,
meaning a single large bulkload can saturate the sink cluster's network and
cause cascading degradation.
h2. Fix
This patch addresses both issues:
*1. Deduplication of concurrent bulkload replication*
{{ReplicationSink}} tracks in-progress bulkload events in a
{{ConcurrentHashSet}}. The key is:
{code}
replicationClusterId + "#" + encodedRegionName + "#" + bulkloadSeqNum
{code}
If a retry arrives while the same event is already being processed, it is
skipped with a WARN log. The key is removed in a {{finally}} block so that
subsequent retries (after the previous attempt completes or fails) are
processed normally.
Note: This prevents duplicate processing under concurrent retry scenarios. It
does not change the at-least-once delivery guarantee of WAL replication.
*2. Configurable bandwidth throttling for HFile copy*
Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s,
default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy
threads within a single {{HFileReplicator}} invocation, so the configured
bandwidth is a per-RS ceiling regardless of
{{hbase.replication.bulkload.copy.maxthreads}}.
{{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit can
be updated dynamically via configuration reload without restarting
RegionServers.
h2. Steps to Reproduce
# Set up bulkload replication: cluster A replicates to cluster B.
# Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or ensure
HFile copy takes longer than {{replication.source.shipedits.timeout}}).
# Perform a large bulkload on cluster A.
# Observe that cluster B processes the same bulkload WAL entry multiple times,
resulting in duplicated store files.
h2. Expected Behavior
Each unique bulkload WAL entry (identified by {{replicationClusterId}},
{{encodedRegionName}}, {{bulkloadSeqNum}}) should be processed at most once
concurrently at the sink, even when the source retries due to RPC timeout.
h2. Actual Behavior
The same bulkload WAL entry is processed multiple times, creating duplicate
store files and triggering compaction storm.
h2. Additional Notes
* Increasing {{replication.source.shipedits.timeout}} reduces the probability
of retry but does not eliminate it and introduces WAL retention overhead.
* The deduplication fix and the bandwidth throttling fix are complementary:
throttling prevents network saturation that causes the timeout in the first
place, while deduplication prevents the cascading failure when timeout does
occur.
> HBase bulkload replication causes duplicate HFile loading and compaction
> storm when source RPC times out
> --------------------------------------------------------------------------------------------------------
>
> Key: HBASE-30238
> URL: https://issues.apache.org/jira/browse/HBASE-30238
> Project: HBase
> Issue Type: Bug
> Components: Replication
> Affects Versions: 2.5.15, 2.6.6
> Reporter: JeongMin Ju
> Assignee: JeongMin Ju
> Priority: Major
> Labels: pull-request-available
>
> h2. Problem
> When bulkload replication is enabled (A to B), the sink cluster (B) copies
> HFiles from the source cluster's (A) HDFS and then performs a bulk load. If
> this operation takes longer than {{replication.source.shipedits.timeout}}
> (default 60s), the source retries the same WAL entry. Since
> {{HFileReplicator}} creates a new random staging directory on every
> invocation, the sink can process the same bulkload event multiple times --
> loading the same HFiles as distinct store files repeatedly.
> This leads to:
> * *Storefile size explosion*: the sink's store file size can grow to 3x or
> more of the source's size.
> * *Compaction storm*: the duplicated store files trigger massive compaction
> work.
> * *Replication lag*: the source accumulates WAL entries while the sink is
> overwhelmed.
> The root cause is that HBase bulkload replication has at-least-once delivery
> semantics, but {{ReplicationSink}} historically had no event tracking
> mechanism to prevent concurrent or completed reprocessing of the same
> bulkload WAL event.
> h2. Evidence (observed in production)
> In a production bulkload replication setup, the following sequence was
> observed:
> * A single bulkload event on the source triggered repeated BulkloadRPC calls
> on the sink -- more invocations than the source fired -- confirming multiple
> retries of the same WAL entry.
> * Sink RS CPU spiked from baseline ~2% to over 40%.
> * Sink network inbound peaked at several hundred MB/s per node.
> * Sink cluster storefile size grew to over 3x the source within ~90 minutes.
> * Compaction queue on the sink peaked at tens of thousands, requiring hours
> of major compaction to recover.
> * Source replication lag peaked at ~70 minutes, with WAL queue growing to
> tens of files per RS.
> h2. Root Cause Analysis
> The execution path is:
> {code}
> ReplicationSourceShipper.shipEdits()
> -> HBaseInterClusterReplicationEndpoint.replicate() [RPC with
> shipEditsTimeout]
> -> ReplicationSink.replicateEntries()
> -> HFileReplicator.replicate()
> copyHFilesToStagingDir() <- copies HFiles from source HDFS
> (slow)
> LoadIncrementalHFiles.load() <- bulk loads into sink table
> {code}
> {{copyHFilesToStagingDir()}} runs inside the RPC handler thread. When copying
> large HFiles takes longer than {{shipEditsTimeout}}, the source retries with
> the same WAL entry. {{HFileReplicator}} creates a new random staging
> directory each time, so there is no HFile-level detection of an in-progress
> or already-completed invocation.
> Retries may also be sent to a different target RegionServer. Therefore, a
> per-{{ReplicationSink}} in-memory guard is not sufficient by itself: the
> event state must be visible across all RegionServers in the sink cluster.
> Additionally, there was no rate-limiting mechanism for the HFile copy
> bandwidth, meaning a single large bulkload can saturate the sink cluster's
> network and cause cascading degradation.
> h2. Fix
> This patch addresses the failure in three layers:
> *1. Configurable bandwidth throttling for HFile copy*
> Introduces {{hbase.replication.bulkload.copy.bandwidth.mb}} (double, MB/s,
> default 0 = unlimited). A Guava {{RateLimiter}} is shared across all copy
> threads within a single {{HFileReplicator}} invocation, so the configured
> bandwidth is a per-RS ceiling regardless of
> {{hbase.replication.bulkload.copy.maxthreads}}.
> {{ReplicationSink}} implements {{ConfigurationObserver}} so the rate limit
> can be updated dynamically via configuration reload without restarting
> RegionServers.
> *2. Distributed bulkload event tracking across sink RegionServers*
> {{ReplicationSink}} now coordinates replicated bulkload WAL events through
> ZooKeeper. The RegionServer service path passes its {{ZKWatcher}} into
> {{ReplicationSink}}, which creates a {{ReplicationBulkLoadEventTracker}}.
> Each bulkload event is identified by:
> {code}
> replicationClusterId
> table
> encodedRegionName
> bulkloadSeqNum
> {code}
> The event is stored under a bucket derived from the WAL write time, using two
> ZooKeeper marker trees:
> {code}
> .../replication/bulkload-events/in-progress/<bucket>/<eventId>
> .../replication/bulkload-events/done/<bucket>/<eventId>
> {code}
> The {{in-progress}} marker is ephemeral and represents the RegionServer
> currently processing the event. The {{done}} marker is persistent and records
> successful completion.
> When a sink receives a bulkload WAL event:
> # If a {{done}} marker already exists, the event is skipped.
> # Otherwise, the sink attempts to create the {{in-progress}} marker.
> # If another RegionServer is already processing the same event, the sink
> waits for either completion or timeout.
> # After {{HFileReplicator.replicate()}} succeeds, the sink writes the
> {{done}} marker and releases the {{in-progress}} marker.
> This prevents duplicate HFile loading when a source-side RPC retry is routed
> to a different target RegionServer after the first target RegionServer
> already completed the same bulkload event.
> A local in-memory guard remains as a fallback for {{ReplicationSink}}
> instances constructed without ZooKeeper, but the RegionServer replication
> service path uses ZooKeeper-backed event tracking.
> *3. Master-side cleanup for completed event markers*
> Adds {{ReplicationBulkLoadEventCleaner}}, a master scheduled chore. It
> removes expired {{done}} markers only when the matching {{in-progress}}
> marker no longer exists, then removes empty bucket znodes.
> The cleanup is controlled by:
> {code}
> hbase.master.cleaner.replication.bulkload.event.period.ms
> hbase.replication.bulkload.event.done.ttl.ms
> {code}
> The default completed-marker TTL is one day.
> h2. Test Coverage
> The patch adds coverage for both the throttling and deduplication paths:
> * {{TestHFileReplicatorBandwidth}} verifies bandwidth throttling and dynamic
> rate-limit updates.
> * {{TestReplicationSinkBulkLoadDedup}} verifies sink-level duplicate handling
> and completed-marker skipping.
> * {{TestReplicationBulkLoadEventTracker}} verifies ZooKeeper in-progress/done
> marker behavior and stable event bucket/id generation.
> * {{TestReplicationBulkLoadEventCleaner}} verifies master-side cleanup of
> expired {{done}} markers while preserving markers with matching
> {{in-progress}} state.
> *
> {{TestBulkLoadReplication#testDuplicateBulkLoadWalEventSkippedAcrossTargetRegionServers}}
> starts MiniCluster RegionServers and replays the exact same bulkload WAL
> batch first through one target RS sink and then through another target RS
> sink, verifying that the second replay is skipped by the completed marker.
> h2. Steps to Reproduce
> # Set up bulkload replication: cluster A replicates to cluster B.
> # Set {{hbase.replication.bulkload.copy.maxthreads=1}} on cluster B (or
> ensure HFile copy takes longer than {{replication.source.shipedits.timeout}}).
> # Perform a large bulkload on cluster A.
> # Observe that cluster B may process the same bulkload WAL entry multiple
> times, resulting in duplicated store files.
> h2. Expected Behavior
> Each unique replicated bulkload WAL event should load its HFiles at most once
> in the sink cluster after successful completion, even when the source retries
> due to RPC timeout and the retry is routed to another target RegionServer.
> Failed attempts remain retryable; successful attempts are recorded with
> completed markers until cleanup TTL expires.
> h2. Actual Behavior
> The same bulkload WAL entry can be processed multiple times, creating
> duplicate store files and triggering compaction storm.
> h2. Additional Notes
> * Increasing {{replication.source.shipedits.timeout}} reduces the probability
> of retry but does not eliminate it and introduces WAL retention overhead.
> * The bandwidth throttling and ZooKeeper event tracking fixes are
> complementary: throttling reduces the chance of network saturation and source
> RPC timeout, while event tracking prevents duplicate bulkload execution when
> timeout/retry still occurs.
> * Pull request: https://github.com/apache/hbase/pull/8380
--
This message was sent by Atlassian Jira
(v8.20.10#820010)