tomasbanet opened a new pull request, #7191:
URL: https://github.com/apache/hbase/pull/7191
# Steps to reproduce
1. Start source hbase regionserver with debug options and connect to it via
a debugger, e.g. eclipse, to be able to place breakpoints to suspend threads at
certain points.
Sample debug options passed to java when running regionserver (debugger
connects at port 5005):
`export
HBASE_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"`
2. Add a serial replication peer.
In hbase shell connected to _source_ hbase:
```
add_peer "peer_1", CLUSTER_KEY =>
"home-host-1:2181,127.0.0.1:2281,home-host-2:2181:2181:/hbase2", TABLE_CFS => {
"test_table" => ["cf1"] }, SERIAL => true
```
3. Create a test table with 1 region and global replication scope, and wait
until the REGION_OPEN entry is successfully replicated.
In hbase shell connected to _sink_ hbase:
```
create "test_table", { NAME => "cf1" }
```
In hbase shell connected to _source_ hbase:
```
create "test_table", { NAME => "cf1", REPLICATION_SCOPE => 1 }
```
HBase state for test_table region:
* hbase:meta barriers = [2]
* Region dir recovered.edits: 1.seqid
* Region dir storefiles: none
* Active WAL entries:
* HBASE::REGION_EVENT::REGION_OPEN seqid=3
* pushedSeqId (/hbase/replication/regions/… znode contents) = 3
4. Put a key-value in source hbase and wait until it is successfully
replicated
In hbase shell connected to _source_ hbase:
```
put "test_table", "key1", "cf1:col1", "val1"
```
HBase state for test_table region:
* hbase:meta barriers = [2]
* Region dir recovered.edits files: 1.seqid
* Region dir storefiles metadata: none
* Active WAL entries:
* HBASE::REGION_EVENT::REGION_OPEN seqid=3
* col1 key1 seqid=4
* pushedSeqId (/hbase/replication/regions/… znode contents) = 4
5. In _source_ hbase regionserver using a debugger set a breakpoint in
SerialReplicationChecker:canPush(Entry entry, Cell firstCellInEdit) (prevent
entries from being pushed in replication WAL reader threads*).
*We rely on a race between replication lagging behind and a region being
re-opened to reproduce the problem.
6. SIGKILL* _source_ hbase regionserver, start it, and wait until regions
are re-opened. When replication WAL reader thread hits a breakpoint, keep the
thread suspended (do not allow pushing any entries)
*must SIGKILL or trigger a non-graceful shutdown/crash of a regionserver in
any other way to prevent the regions on that regionserver from being closed
normally, e.g. prevent memstore from being flushed and a
HBASE::REGION_EVENT::REGION_CLOSE marker from being written to WAL.
HBase state for test_table region:
* hbase:meta barriers = [2, 5]
* Region dir recovered.edits files: 4.seqid
* Region dir storefiles metadata: [MAX_MEMSTORE_TS_KEY = 4, MAX_SEQ_ID_KEY =
4]
* Active WAL entries*:
* HBASE::REGION_EVENT::REGION_OPEN seqid=6
* `**`
* pushedSeqId (/hbase/replication/regions/… znode contents) = 4
*these entries must not be pushed yet and prevent pushedSeqId from being
updated. This is why we have suspended the replication WAL reader threads in
step 5. If entries are allowed to be pushed, then the entries will be in the
last range and get pushed successfully, avoiding the problem described here.
**if the region dir had storefiles before step 6 already, there would also
be HBASE::COMPACTION seqid=7 entry in the WAL due to the new storefile
resulting from replayed WAL entries being flushed and compacted with older
storefiles.
7. Roll the source hbase regionserver WAL so that its active WAL becomes
empty (contains no entries)*
In hbase shell connected to _source_ hbase:
```
wal_roll "<regionserver servername>"
```
This ensures that when the regionserver crashes again there will be no split
WAL's produced (since there will be no entries in the dead regionserver's
active WAL).
*only required if HBASE::COMPACTION event was written to the WAL when
test_table region was re-opened, e.g. if test_table region already had some
storefiles before step 6. This is because during WAL splitting,
HBASE::COMPACTION entry is included in a region's split WAL (unlike
HBASE::REGION_EVENT::\* entries which are filtered out), causing openSeqNum to
be increased during split WAL replay.
8. SIGKILL source hbase regionserver, start it, and wait until regions are
re-opened.
HBase state for test_table region:
* hbase:meta barriers = [2, 5, 6]
* Region dir recovered.edits files: 5.seqid
* Region dir storefiles metadata: [MAX_MEMSTORE_TS_KEY = 4, MAX_SEQ_ID_KEY =
4]
* Active WAL entries:
* HBASE::REGION_EVENT::REGION_OPEN seqid=7
* pushedSeqId (/hbase/replication/regions/… znode contents) = 4
9. Resume replication WAL reader threads in debugger
The replication queue will be:
* /hbase/replication/rs/<current-regionserver-servername>/peer_1
* Active WAL
* HBASE::REGION_EVENT::REGION_OPEN seqid=7 ← stuck pushing this entry
*
/hbase/replication/rs/<current-regionserver-servername>/peer_1-<old-regionserver-server-name>
* Old non-empty WAL
* HBASE::REGION_EVENT::REGION_OPEN seqid=6 ← stuck pushing this entry
* Old empty WAL (result from rolling WAL in step 7)
The active WAL will not be able to push an entry (seqid=7) due to waiting
for previous entry (seqid=6) to be replicated:
```
2025-08-05T12:32:27,308 DEBUG
[regionserver/home-host-1:16020.replicationSource.wal-reader.home-host-1%2C16020%2C1754392923670,peer_1]
regionserver.SerialReplicationChecker: Replication barrier for
test_table/e8bfb3479752a0512841ed8c47d5c8d0/7=[#edits: 0 = <>]:
ReplicationBarrierResult [barriers=[2, 5, 6], state=OPEN, parentRegionNames=]
2025-08-05T12:32:27,310 DEBUG
[regionserver/home-host-1:16020.replicationSource.wal-reader.home-host-1%2C16020%2C1754392923670,peer_1]
regionserver.SerialReplicationChecker: Previous range for
test_table/e8bfb3479752a0512841ed8c47d5c8d0/7=[#edits: 0 = <>] has not been
finished yet, give up
2025-08-05T12:32:27,311 DEBUG
[regionserver/home-host-1:16020.replicationSource.wal-reader.home-host-1%2C16020%2C1754392923670,peer_1]
regionserver.SerialReplicationChecker: Can not push
test_table/e8bfb3479752a0512841ed8c47d5c8d0/7=[#edits: 0 = <>], wait
```
The old non-empty WAL will not be able to push an entry (seqid=6) due to
calculating the previous range for the entry as barrier 6 instead of barrier 5:
```
2025-08-05T12:32:30,186 DEBUG
[RS_CLAIM_REPLICATION_QUEUE-regionserver/home-host-1:16020-0.replicationSource,peer_1-home-host-1,16020,1754391356227.replicationSource.wal-reader.home-host-1%2C16020%2C1754391356227,peer_1-home-host-1,16020,1754391356227]
regionserver.SerialReplicationChecker: Replication barrier for
test_table/e8bfb3479752a0512841ed8c47d5c8d0/6=[#edits: 0 = <>]:
ReplicationBarrierResult [barriers=[2, 5, 6], state=OPEN, parentRegionNames=]
2025-08-05T12:32:30,187 DEBUG
[RS_CLAIM_REPLICATION_QUEUE-regionserver/home-host-1:16020-0.replicationSource,peer_1-home-host-1,16020,1754391356227.replicationSource.wal-reader.home-host-1%2C16020%2C1754391356227,peer_1-home-host-1,16020,1754391356227]
regionserver.SerialReplicationChecker: Previous range for
test_table/e8bfb3479752a0512841ed8c47d5c8d0/6=[#edits: 0 = <>] has not been
finished yet, give up
2025-08-05T12:32:30,187 DEBUG
[RS_CLAIM_REPLICATION_QUEUE-regionserver/home-host-1:16020-0.replicationSource,peer_1-home-host-1,16020,1754391356227.replicationSource.wal-reader.home-host-1%2C16020%2C1754391356227,peer_1-home-host-1,16020,1754391356227]
regionserver.SerialReplicationChecker: Can not push
test_table/e8bfb3479752a0512841ed8c47d5c8d0/6=[#edits: 0 = <>], wait
```
# The fix
Currently entry with seqid=6 is processed like this:
```
private boolean canPush(Entry entry, byte[] row) throws IOException {
String encodedNameAsString =
Bytes.toString(entry.getKey().getEncodedRegionName());
long seqId = entry.getKey().getSequenceId(); // <--- 6
ReplicationBarrierResult barrierResult =
MetaTableAccessor.getReplicationBarrierResult(conn,
entry.getKey().getTableName(), row,
entry.getKey().getEncodedRegionName());
LOG.debug("Replication barrier for {}: {}", entry, barrierResult);
long[] barriers = barrierResult.getBarriers(); // <--- [2, 5, 6]
int index = Arrays.binarySearch(barriers, seqId); // <--- 2
if (index == -1) { // <--- false
LOG.debug("{} is before the first barrier, pass", entry);
// This means we are in the range before the first record openSeqNum,
this usually because the
// wal is written before we enable serial replication for this table,
just return true since
// we can not guarantee the order.
pushed.getUnchecked(encodedNameAsString).setValue(seqId);
return true;
}
// The sequence id range is left closed and right open, so either we
decrease the missed insert
// point to make the index start from 0, or increase the hit insert point
to make the index
// start from 1. Here we choose the latter one.
if (index < 0) { // <--- false
index = -index - 1;
} else {
index++; // <--- index = 3
}
if (index == 1) { // <--- false
// we are in the first range, check whether we have parents
for (byte[] regionName : barrierResult.getParentRegionNames()) {
if (!isParentFinished(regionName)) {
LOG.debug("Parent {} has not been finished yet for entry {}, give
up",
Bytes.toStringBinary(regionName), entry);
return false;
}
}
if (isLastRangeAndOpening(barrierResult, index)) {
LOG.debug("{} is in the last range and the region is opening, give
up", entry);
return false;
}
LOG.debug("{} is in the first range, pass", entry);
recordCanPush(encodedNameAsString, seqId, barriers, 1);
return true;
}
// check whether the previous range is finished
if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) { // <---
true
LOG.debug("Previous range for {} has not been finished yet, give up",
entry);
return false; // <--- return false
}
if (isLastRangeAndOpening(barrierResult, index)) {
LOG.debug("{} is in the last range and the region is opening, give up",
entry);
return false;
}
LOG.debug("The previous range for {} has been finished, pass", entry);
recordCanPush(encodedNameAsString, seqId, barriers, index);
return true;
}
private boolean isRangeFinished(long endBarrier, String encodedRegionName)
throws IOException { // <-- endBarrier = 6
long pushedSeqId;
try {
pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId); //
<--- pushedSeqId = 4
} catch (ReplicationException e) {
throw new IOException(
"Failed to get pushed sequence id for " + encodedRegionName + ", peer
" + peerId, e);
}
// endBarrier is the open sequence number. When opening a region, the
open sequence number will
// be set to the old max sequence id plus one, so here we need to minus
one.
return pushedSeqId >= endBarrier - 1; // <--- 4 >= 6-1 <=> 4 >= 5
<=> false
}
```
In our fix we change the index calculation so that index is not incremented
when seqId == barrier[index], so that isRangeFinished is called with
barrier[index-1] (which returns true) and seqId is passed through.
# Unit test, integration test
TODO
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]