tomasbanet opened a new pull request, #7194:
URL: https://github.com/apache/hbase/pull/7194
# Detailed Example
Zookeeper state:
* /hbase/replication/regions/e8/bf/b3479752a0512841ed8c47d5c8d0-peer_1
* pushedSeqId=27
* (Main queue)
/hbase/replication/rs/home-host-1,16020,1754475014225/peer_1/home-host-1%2C16020%2C1754475014225.1754475020411
* lastWalPosition=409
* (Reclaimed queue)
/hbase/replication/rs/home-host-1,16020,1754475014225/peer_1-home-host-1,16020,1754474482197/home-host-1%2C16020%2C1754474482197.1754474488449
* lastWalPosition=716
WAL's:
* (Active WAL)
/hbase/replication/rs/home-host-1,16020,1754475014225/peer_1/home-host-1%2C16020%2C1754475014225.1754475020411
```
position: 409
,{"sequence":"34","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::REGION_EVENT::REGION_OPEN","vlen":216,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754475023833","total_size_sum":"328"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
heap size: 368 <--- trying to push this entry
position: 750
,{"sequence":"35","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::COMPACTION","vlen":219,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754475024678","total_size_sum":"320"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
heap size: 360
position: 1079
,{"sequence":"36","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"col1","vlen":4,"row":"key6","type":"Put","family":"cf1","timestamp":"1754475048622","total_size_sum":"88"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
heap size: 128
position: 1185
```
* (Old WAL from reclaimed queue)
/hbase/oldWALs/home-host-1%2C16020%2C1754474482197.1754474488449
```
position: 409
,{"sequence":"27","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::REGION_EVENT::REGION_OPEN","vlen":182,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754474494541","total_size_sum":"296"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
heap size: 336
position: 716
,{"sequence":"28","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"col1","vlen":4,"row":"key5","type":"Put","family":"cf1","timestamp":"1754474868855","total_size_sum":"88"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
heap size: 128 <--- trying to push this entry
position: 822
,{"sequence":"30","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::FLUSH","vlen":123,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754474978323","total_size_sum":"216"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
heap size: 256
position: 1341
,{"sequence":"31","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::FLUSH","vlen":157,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754474979228","total_size_sum":"248"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
heap size: 288
position: 1603
,{"sequence":"32","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::REGION_EVENT::REGION_CLOSE","vlen":148,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754474979245","total_size_sum":"264"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
heap size: 304
position: 1877
```
hbase:meta barriers:
`[2, 5, 6, 9, 17, 26, 33]`
Entry with seqId=34 cannot be pushed until previous range finishes (entries
with seqId 27, 28, 30, 31, 32, between barriers 26 and 33):
```
2025-08-06T11:13:05,286 DEBUG
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.wal-reader.home-host-1%2C16020%2C1754475014225,peer_1]
regionserver.SerialReplicationChecker: Replication barrier for
test_table/e8bfb3479752a0512841ed8c47d5c8d0/34=[#edits: 0 = <>]:
ReplicationBarrierResult [barriers=[2, 5, 6, 9, 17, 26, 33], state=OPEN,
parentRegionNames=]
2025-08-06T11:13:39,338 DEBUG
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.wal-reader.home-host-1%2C16020%2C1754475014225,peer_1]
regionserver.SerialReplicationChecker: Previous range for
test_table/e8bfb3479752a0512841ed8c47d5c8d0/34=[#edits: 0 = <>] has not been
finished yet, give up
```
However, if main WAL reader thread runs before RS_CLAIM_REPLICATION_QUEUE
WAL reader thread, when running canPush with entry seqId=34 and IOException is
caught, in SerialReplicationSourceWALReader.readWALEntries we get:
```
try {
if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) { // <---
entry test_table/e8bfb3479752a0512841ed8c47d5c8d0/34=[#edits: 0 = <>]
if (batch.getLastWalPosition() > positionBefore) {
// we have something that can push, break
break;
} else {
checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
}
}
} catch (IOException e) {
LOG.warn("failed to check whether we can push the WAL entries", e); // <---
catch exception here
if (batch.getLastWalPosition() > positionBefore) { // <--- 0 > 409 <=>
false
// we have something that can push, break
break;
}
sleepMultiplier = sleep(sleepMultiplier); // <--- sleep for some time
}
// arrive here means we can push the entry, record the last sequence id
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
entry.getKey().getSequenceId()); // <--- lastSeqIds set to 34
// actually remove the entry.
removeEntryFromStream(entryStream, batch);
if (addEntryToBatch(batch, entry)) { // <-- false (entry has 0 edits)
break;
}
// <---continue to process another entry
```
The next iteration in SerialReplicationSourceWALReader.readWALEntries
processes entry with seqId=35:
```
try {
if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) { // <---
entry test_table/e8bfb3479752a0512841ed8c47d5c8d0/35=[#edits: 0 = <>]
if (batch.getLastWalPosition() > positionBefore) {
// we have something that can push, break
break;
} else {
checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
}
}
} catch (IOException e) {
LOG.warn("failed to check whether we can push the WAL entries", e); // <---
catch exception here
if (batch.getLastWalPosition() > positionBefore) { // <--- 750 > 409 <=>
true
// we have something that can push, break
break; // <--- break here
}
sleepMultiplier = sleep(sleepMultiplier);
}
```
The shipper thread logs:
```
2025-08-06T11:44:34,477 DEBUG
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.shipperhome-host-1%2C16020%2C1754475014225,peer_1]
regionserver.ReplicationSourceShipper: Shipper from source peer_1 got entry
batch from reader: WALEntryBatch [walEntries=[],
lastWalPath=hdfs://mycluster/hbase/WALs/home-host-1,16020,1754475014225/home-host-1%2C16020%2C1754475014225.1754475020411,
lastWalPosition=750, nbRowKeys=0, nbHFiles=0, heapSize=0,
lastSeqIds={e8bfb3479752a0512841ed8c47d5c8d0=34},
endOfFile=false,usedBufferSize=0]
```
ReplicationSourceShipper.shipEdits ships edit with seqId=34:
```
private void shipEdits(WALEntryBatch entryBatch) { // <--- entryBatch
WALEntryBatch [walEntries=[],
lastWalPath=hdfs://mycluster/hbase/WALs/home-host-1,16020,1754475014225/home-host-1%2C16020%2C1754475014225.1754475020411,
lastWalPosition=750, nbRowKeys=0, nbHFiles=0, heapSize=0,
lastSeqIds={e8bfb3479752a0512841ed8c47d5c8d0=34},
endOfFile=false,usedBufferSize=0]
List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) { // <--- true
updateLogPosition(entryBatch);
return;
}
```
`updateLogPosition()` will call
`ReplicationSourceManager.logPositionAndCleanOldLogs()`, which calls
`ZKReplicationQueueStorage.setWALPosition()`.
`ZKReplicationQueueStorage.setWALPosition()` updates zookeeper with:
* /hbase/replication/regions/e8/bf/b3479752a0512841ed8c47d5c8d0-peer_1
* pushedSeqId=34
*
/hbase/replication/rs/home-host-1,16020,1754475014225/peer_1/home-host-1%2C16020%2C1754475014225.1754475020411
* lastWalPosition=750
Afterwards main WAL reader thread can push entries with seqId higher than
seqId's in reclaimed queue:
```
2025-08-06T11:55:11,218 DEBUG
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.wal-reader.home-host-1%2C16020%2C1754475014225,peer_1]
regionserver.SerialReplicationChecker: Replication barrier for
test_table/e8bfb3479752a0512841ed8c47d5c8d0/35=[#edits: 0 = <>]:
ReplicationBarrierResult [barriers=[2, 5, 6, 9, 17, 26, 33], state=OPEN,
parentRegionNames=]
2025-08-06T11:56:51,300 DEBUG
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.wal-reader.home-host-1%2C16020%2C1754475014225,peer_1]
regionserver.SerialReplicationChecker: The previous range for
test_table/e8bfb3479752a0512841ed8c47d5c8d0/35=[#edits: 0 = <>] has been
finished, pass
2025-08-06T11:57:25,313 TRACE
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.wal-reader.home-host-1%2C16020%2C1754475014225,peer_1]
regionserver.SerialReplicationChecker: The sequence id for
test_table/e8bfb3479752a0512841ed8c47d5c8d0/36=[#edits: 1 =
<key6/cf1:col1/1754475048622/Put/vlen=4/seqid=0; >] is continuous, pass
```
This means table in sink cluster can have out of order entries (key6 seqid
in source cluster = 36):
```
hbase:005:0> scan "test_table"
scan "test_table"
ROW COLUMN+CELL
key1 column=cf1:col1, timestamp=2025-08-05T11:45:04.884, value=val1
key2 column=cf1:col1, timestamp=2025-08-06T04:47:52.269, value=val1
key3 column=cf1:col1, timestamp=2025-08-06T04:51:51.826, value=val1
key4 column=cf1:col1, timestamp=2025-08-06T05:08:19.270, value=val1
key6 column=cf1:col1, timestamp=2025-08-06T11:10:48.622, value=val1
5 row(s)
Took 0.1082 seconds
```
After reclaimed queue finishes (key5 seqid in source cluster = 28):
```
hbase:006:0> scan "test_table"
scan "test_table"
ROW COLUMN+CELL
key1 column=cf1:col1, timestamp=2025-08-05T11:45:04.884, value=val1
key2 column=cf1:col1, timestamp=2025-08-06T04:47:52.269, value=val1
key3 column=cf1:col1, timestamp=2025-08-06T04:51:51.826, value=val1
key4 column=cf1:col1, timestamp=2025-08-06T05:08:19.270, value=val1
key5 column=cf1:col1, timestamp=2025-08-06T11:07:48.855, value=val1
key6 column=cf1:col1, timestamp=2025-08-06T11:10:48.622, value=val1
6 row(s)
Took 0.0735 seconds
```
# Unit test, integration test
TODO
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]