tomasbanet opened a new pull request, #7194:
URL: https://github.com/apache/hbase/pull/7194

   # Detailed Example
   
   Zookeeper state:
   * /hbase/replication/regions/e8/bf/b3479752a0512841ed8c47d5c8d0-peer_1
     * pushedSeqId=27
   * (Main queue) 
/hbase/replication/rs/home-host-1,16020,1754475014225/peer_1/home-host-1%2C16020%2C1754475014225.1754475020411
     * lastWalPosition=409
   * (Reclaimed queue) 
/hbase/replication/rs/home-host-1,16020,1754475014225/peer_1-home-host-1,16020,1754474482197/home-host-1%2C16020%2C1754474482197.1754474488449
     * lastWalPosition=716
   
   
   WAL's:
   
   * (Active WAL) 
/hbase/replication/rs/home-host-1,16020,1754475014225/peer_1/home-host-1%2C16020%2C1754475014225.1754475020411
   
   ```
   position: 409
   
,{"sequence":"34","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::REGION_EVENT::REGION_OPEN","vlen":216,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754475023833","total_size_sum":"328"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
 heap size: 368 <--- trying to push this entry
   position: 750
   
,{"sequence":"35","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::COMPACTION","vlen":219,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754475024678","total_size_sum":"320"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
 heap size: 360
   position: 1079
   
,{"sequence":"36","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"col1","vlen":4,"row":"key6","type":"Put","family":"cf1","timestamp":"1754475048622","total_size_sum":"88"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
 heap size: 128
   position: 1185
   ```
   
   * (Old WAL from reclaimed queue) 
/hbase/oldWALs/home-host-1%2C16020%2C1754474482197.1754474488449
   
   ```
   position: 409
   
,{"sequence":"27","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::REGION_EVENT::REGION_OPEN","vlen":182,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754474494541","total_size_sum":"296"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
 heap size: 336
   position: 716
   
,{"sequence":"28","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"col1","vlen":4,"row":"key5","type":"Put","family":"cf1","timestamp":"1754474868855","total_size_sum":"88"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
 heap size: 128 <--- trying to push this entry
   position: 822
   
,{"sequence":"30","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::FLUSH","vlen":123,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754474978323","total_size_sum":"216"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
 heap size: 256
   position: 1341
   
,{"sequence":"31","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::FLUSH","vlen":157,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754474979228","total_size_sum":"248"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
 heap size: 288
   position: 1603
   
,{"sequence":"32","region":"e8bfb3479752a0512841ed8c47d5c8d0","actions":[{"qualifier":"HBASE::REGION_EVENT::REGION_CLOSE","vlen":148,"row":"\\x00","type":"Put","family":"METAFAMILY","timestamp":"1754474979245","total_size_sum":"264"}],"table":{"name":[116,101,115,116,95,116,97,98,108,101],"nameAsString":"test_table","namespace":[100,101,102,97,117,108,116],"namespaceAsString":"default","qualifier":[116,101,115,116,95,116,97,98,108,101],"qualifierAsString":"test_table","systemTable":false,"hashCode":2070785505}}edit
 heap size: 304
   position: 1877 
   ```
   
   hbase:meta barriers:
   `[2, 5, 6, 9, 17, 26, 33]`
   
   Entry with seqId=34 cannot be pushed until previous range finishes (entries 
with seqId 27, 28, 30, 31, 32, between barriers 26 and 33):
   ```
   2025-08-06T11:13:05,286 DEBUG 
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.wal-reader.home-host-1%2C16020%2C1754475014225,peer_1]
 regionserver.SerialReplicationChecker: Replication barrier for 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/34=[#edits: 0 = <>]: 
ReplicationBarrierResult [barriers=[2, 5, 6, 9, 17, 26, 33], state=OPEN, 
parentRegionNames=]
   2025-08-06T11:13:39,338 DEBUG 
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.wal-reader.home-host-1%2C16020%2C1754475014225,peer_1]
 regionserver.SerialReplicationChecker: Previous range for 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/34=[#edits: 0 = <>] has not been 
finished yet, give up
   ```
   
   However, if main WAL reader thread runs before RS_CLAIM_REPLICATION_QUEUE 
WAL reader thread, when running canPush with entry seqId=34 and IOException is 
caught, in SerialReplicationSourceWALReader.readWALEntries we get:
   ```
   try {
    if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) { // <--- 
entry test_table/e8bfb3479752a0512841ed8c47d5c8d0/34=[#edits: 0 = <>] 
      if (batch.getLastWalPosition() > positionBefore) {
        // we have something that can push, break
        break;
      } else {
        checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
      }
    }
   } catch (IOException e) {
    LOG.warn("failed to check whether we can push the WAL entries", e); // <--- 
catch exception here
    if (batch.getLastWalPosition() > positionBefore) { // <--- 0 > 409   <=>  
false
      // we have something that can push, break
      break;
    }
    sleepMultiplier = sleep(sleepMultiplier); // <--- sleep for some time
   }
   // arrive here means we can push the entry, record the last sequence id
   batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
    entry.getKey().getSequenceId()); // <--- lastSeqIds set to 34
   // actually remove the entry.
   removeEntryFromStream(entryStream, batch);
   if (addEntryToBatch(batch, entry)) { // <-- false (entry has 0 edits)
    break;
   }
   // <---continue to process another entry
   ```
   
   
   The next iteration in SerialReplicationSourceWALReader.readWALEntries 
processes entry with seqId=35:
   ```
   try {
    if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) { // <--- 
entry test_table/e8bfb3479752a0512841ed8c47d5c8d0/35=[#edits: 0 = <>]
      if (batch.getLastWalPosition() > positionBefore) {
        // we have something that can push, break
        break;
      } else {
        checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
      }
    }
   } catch (IOException e) {
    LOG.warn("failed to check whether we can push the WAL entries", e); // <--- 
catch exception here
    if (batch.getLastWalPosition() > positionBefore) { // <--- 750 > 409   <=>  
true
      // we have something that can push, break
      break; // <--- break here
    }
    sleepMultiplier = sleep(sleepMultiplier);
   }
   ```
   
   The shipper thread logs:
   ```
   2025-08-06T11:44:34,477 DEBUG 
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.shipperhome-host-1%2C16020%2C1754475014225,peer_1]
 regionserver.ReplicationSourceShipper: Shipper from source peer_1 got entry 
batch from reader: WALEntryBatch [walEntries=[], 
lastWalPath=hdfs://mycluster/hbase/WALs/home-host-1,16020,1754475014225/home-host-1%2C16020%2C1754475014225.1754475020411,
 lastWalPosition=750, nbRowKeys=0, nbHFiles=0, heapSize=0, 
lastSeqIds={e8bfb3479752a0512841ed8c47d5c8d0=34}, 
endOfFile=false,usedBufferSize=0]
   ```
   
   ReplicationSourceShipper.shipEdits ships edit with seqId=34:
   ```
    private void shipEdits(WALEntryBatch entryBatch) { // <--- entryBatch 
WALEntryBatch [walEntries=[], 
lastWalPath=hdfs://mycluster/hbase/WALs/home-host-1,16020,1754475014225/home-host-1%2C16020%2C1754475014225.1754475020411,
 lastWalPosition=750, nbRowKeys=0, nbHFiles=0, heapSize=0, 
lastSeqIds={e8bfb3479752a0512841ed8c47d5c8d0=34}, 
endOfFile=false,usedBufferSize=0]
      List<Entry> entries = entryBatch.getWalEntries();
      int sleepMultiplier = 0;
      if (entries.isEmpty()) { // <--- true
        updateLogPosition(entryBatch);
        return;
      }
   ```
   
   `updateLogPosition()` will call 
`ReplicationSourceManager.logPositionAndCleanOldLogs()`, which calls 
`ZKReplicationQueueStorage.setWALPosition()`.
   
   `ZKReplicationQueueStorage.setWALPosition()` updates zookeeper with:
   * /hbase/replication/regions/e8/bf/b3479752a0512841ed8c47d5c8d0-peer_1
     * pushedSeqId=34
   * 
/hbase/replication/rs/home-host-1,16020,1754475014225/peer_1/home-host-1%2C16020%2C1754475014225.1754475020411
     * lastWalPosition=750
   
   
   Afterwards main WAL reader thread can push entries with seqId higher than 
seqId's in reclaimed queue:
   ```
   2025-08-06T11:55:11,218 DEBUG 
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.wal-reader.home-host-1%2C16020%2C1754475014225,peer_1]
 regionserver.SerialReplicationChecker: Replication barrier for 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/35=[#edits: 0 = <>]: 
ReplicationBarrierResult [barriers=[2, 5, 6, 9, 17, 26, 33], state=OPEN, 
parentRegionNames=]
   2025-08-06T11:56:51,300 DEBUG 
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.wal-reader.home-host-1%2C16020%2C1754475014225,peer_1]
 regionserver.SerialReplicationChecker: The previous range for 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/35=[#edits: 0 = <>] has been 
finished, pass
   2025-08-06T11:57:25,313 TRACE 
[regionserver/home-host-1:16020.replicationSource,peer_1.replicationSource.wal-reader.home-host-1%2C16020%2C1754475014225,peer_1]
 regionserver.SerialReplicationChecker: The sequence id for 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/36=[#edits: 1 = 
<key6/cf1:col1/1754475048622/Put/vlen=4/seqid=0; >] is continuous, pass
   ```
   
   This means table in sink cluster can have out of order entries (key6 seqid 
in source cluster = 36):
   ```
   hbase:005:0> scan "test_table"
   scan "test_table"
   ROW  COLUMN+CELL
    key1 column=cf1:col1, timestamp=2025-08-05T11:45:04.884, value=val1
    key2 column=cf1:col1, timestamp=2025-08-06T04:47:52.269, value=val1
    key3 column=cf1:col1, timestamp=2025-08-06T04:51:51.826, value=val1
    key4 column=cf1:col1, timestamp=2025-08-06T05:08:19.270, value=val1
    key6 column=cf1:col1, timestamp=2025-08-06T11:10:48.622, value=val1
   5 row(s)
   Took 0.1082 seconds
   ```
   
   After reclaimed queue finishes (key5 seqid in source cluster = 28):
   ```
   hbase:006:0> scan "test_table"
   scan "test_table"
   ROW  COLUMN+CELL
    key1 column=cf1:col1, timestamp=2025-08-05T11:45:04.884, value=val1
    key2 column=cf1:col1, timestamp=2025-08-06T04:47:52.269, value=val1
    key3 column=cf1:col1, timestamp=2025-08-06T04:51:51.826, value=val1
    key4 column=cf1:col1, timestamp=2025-08-06T05:08:19.270, value=val1
    key5 column=cf1:col1, timestamp=2025-08-06T11:07:48.855, value=val1
    key6 column=cf1:col1, timestamp=2025-08-06T11:10:48.622, value=val1
   6 row(s)
   Took 0.0735 seconds
   ```
   
   # Unit test, integration test
   
   TODO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to