tomasbanet opened a new pull request, #7191:
URL: https://github.com/apache/hbase/pull/7191

   # Steps to reproduce
   
   1. Start source hbase regionserver with debug options and connect to it via 
a debugger, e.g. eclipse, to be able to place breakpoints to suspend threads at 
certain points.
   
   Sample debug options passed to java when running regionserver (debugger 
connects at port 5005):
   `export 
HBASE_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"`
   
   
   2. Add a serial replication peer.
   
   In hbase shell connected to _source_ hbase:
   
   ```
   add_peer "peer_1", CLUSTER_KEY => 
"home-host-1:2181,127.0.0.1:2281,home-host-2:2181:2181:/hbase2", TABLE_CFS => { 
"test_table" => ["cf1"] }, SERIAL => true
   ```
   
   3. Create a test table with 1 region and global replication scope, and wait 
until the REGION_OPEN entry is successfully replicated.
   
   In hbase shell connected to _sink_ hbase:
   
   ```
   create "test_table", { NAME => "cf1" }
   ```
   
   In hbase shell connected to _source_ hbase:
   
   ```
   create "test_table", { NAME => "cf1", REPLICATION_SCOPE => 1 }
   ```
   
   HBase state for test_table region:
   * hbase:meta barriers = [2]
   * Region dir recovered.edits: 1.seqid
   * Region dir storefiles: none
   * Active WAL entries:
     * HBASE::REGION_EVENT::REGION_OPEN seqid=3
   * pushedSeqId (/hbase/replication/regions/… znode contents) = 3
    
   4. Put a key-value in source hbase and wait until it is successfully 
replicated
   
   In hbase shell connected to _source_ hbase:
   
   ```
   put "test_table", "key1", "cf1:col1", "val1"
   ```
   
   HBase state for test_table region:
   * hbase:meta barriers = [2]
   * Region dir recovered.edits files: 1.seqid
   * Region dir storefiles metadata: none
   * Active WAL entries:
     * HBASE::REGION_EVENT::REGION_OPEN seqid=3
     * col1 key1 seqid=4
   * pushedSeqId (/hbase/replication/regions/… znode contents) = 4
   
   5. In _source_ hbase regionserver using a debugger set a breakpoint in 
SerialReplicationChecker:canPush(Entry entry, Cell firstCellInEdit) (prevent 
entries from being pushed in replication WAL reader threads*).
   
   *We rely on a race between replication lagging behind and a region being 
re-opened to reproduce the problem.
   
   6. SIGKILL* _source_ hbase regionserver, start it, and wait until regions 
are re-opened. When replication WAL reader thread hits a breakpoint, keep the 
thread suspended (do not allow pushing any entries) 
   
   *must SIGKILL or trigger a non-graceful shutdown/crash of a regionserver in 
any other way to prevent the regions on that regionserver from being closed 
normally, e.g. prevent memstore from being flushed and a 
HBASE::REGION_EVENT::REGION_CLOSE marker from being written to WAL.
   
   HBase state for test_table region:
   * hbase:meta barriers = [2, 5]
   * Region dir recovered.edits files: 4.seqid
   * Region dir storefiles metadata: [MAX_MEMSTORE_TS_KEY = 4, MAX_SEQ_ID_KEY = 
4]
   * Active WAL entries*:
      * HBASE::REGION_EVENT::REGION_OPEN seqid=6
      * `**`
   * pushedSeqId (/hbase/replication/regions/… znode contents) = 4
   
   *these entries must not be pushed yet and prevent pushedSeqId from being 
updated. This is why we have suspended the replication WAL reader threads in 
step 5. If entries are allowed to be pushed, then the entries will be in the 
last range and get pushed successfully, avoiding the problem described here.
   
   **if the region dir had storefiles before step 6 already, there would also 
be HBASE::COMPACTION seqid=7 entry in the WAL due to the new storefile 
resulting from replayed WAL entries being flushed and compacted with older 
storefiles.
   
   7. Roll the source hbase regionserver WAL so that its active WAL becomes 
empty (contains no entries)*
   
   In hbase shell connected to _source_ hbase:
   
   ```
   wal_roll "<regionserver servername>"
   ```
   
   This ensures that when the regionserver crashes again there will be no split 
WAL's produced (since there will be no entries in the dead regionserver's 
active WAL).
   
   *only required if HBASE::COMPACTION event was written to the WAL when 
test_table region was re-opened, e.g. if test_table region already had some 
storefiles before step 6. This is because during WAL splitting, 
HBASE::COMPACTION entry is included in a region's split WAL (unlike 
HBASE::REGION_EVENT::\* entries which are filtered out), causing openSeqNum to 
be increased during split WAL replay.
   
   8. SIGKILL source hbase regionserver, start it, and wait until regions are 
re-opened.
   
   HBase state for test_table region:
   * hbase:meta barriers = [2, 5, 6]
   * Region dir recovered.edits files: 5.seqid
   * Region dir storefiles metadata: [MAX_MEMSTORE_TS_KEY = 4, MAX_SEQ_ID_KEY = 
4]
   * Active WAL entries:
     * HBASE::REGION_EVENT::REGION_OPEN seqid=7
   * pushedSeqId (/hbase/replication/regions/… znode contents) = 4
   
   9. Resume replication WAL reader threads in debugger
   
   The replication queue will be:
   * /hbase/replication/rs/<current-regionserver-servername>/peer_1
     * Active WAL
        * HBASE::REGION_EVENT::REGION_OPEN seqid=7 ← stuck pushing this entry
   * 
/hbase/replication/rs/<current-regionserver-servername>/peer_1-<old-regionserver-server-name>
     * Old non-empty WAL
        * HBASE::REGION_EVENT::REGION_OPEN seqid=6 ← stuck pushing this entry
     * Old empty WAL (result from rolling WAL in step 7)
   
   The active WAL will not be able to push an entry (seqid=7) due to waiting 
for previous entry (seqid=6) to be replicated:
   ```
   2025-08-05T12:32:27,308 DEBUG 
[regionserver/home-host-1:16020.replicationSource.wal-reader.home-host-1%2C16020%2C1754392923670,peer_1]
 regionserver.SerialReplicationChecker: Replication barrier for 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/7=[#edits: 0 = <>]: 
ReplicationBarrierResult [barriers=[2, 5, 6], state=OPEN, parentRegionNames=]
   2025-08-05T12:32:27,310 DEBUG 
[regionserver/home-host-1:16020.replicationSource.wal-reader.home-host-1%2C16020%2C1754392923670,peer_1]
 regionserver.SerialReplicationChecker: Previous range for 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/7=[#edits: 0 = <>] has not been 
finished yet, give up
   2025-08-05T12:32:27,311 DEBUG 
[regionserver/home-host-1:16020.replicationSource.wal-reader.home-host-1%2C16020%2C1754392923670,peer_1]
 regionserver.SerialReplicationChecker: Can not push 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/7=[#edits: 0 = <>], wait
   ```
   
   The old non-empty WAL will not be able to push an entry (seqid=6) due to 
calculating the previous range for the entry as barrier 6 instead of barrier 5:
   ```
   2025-08-05T12:32:30,186 DEBUG 
[RS_CLAIM_REPLICATION_QUEUE-regionserver/home-host-1:16020-0.replicationSource,peer_1-home-host-1,16020,1754391356227.replicationSource.wal-reader.home-host-1%2C16020%2C1754391356227,peer_1-home-host-1,16020,1754391356227]
 regionserver.SerialReplicationChecker: Replication barrier for 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/6=[#edits: 0 = <>]: 
ReplicationBarrierResult [barriers=[2, 5, 6], state=OPEN, parentRegionNames=]
   2025-08-05T12:32:30,187 DEBUG 
[RS_CLAIM_REPLICATION_QUEUE-regionserver/home-host-1:16020-0.replicationSource,peer_1-home-host-1,16020,1754391356227.replicationSource.wal-reader.home-host-1%2C16020%2C1754391356227,peer_1-home-host-1,16020,1754391356227]
 regionserver.SerialReplicationChecker: Previous range for 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/6=[#edits: 0 = <>] has not been 
finished yet, give up
   2025-08-05T12:32:30,187 DEBUG 
[RS_CLAIM_REPLICATION_QUEUE-regionserver/home-host-1:16020-0.replicationSource,peer_1-home-host-1,16020,1754391356227.replicationSource.wal-reader.home-host-1%2C16020%2C1754391356227,peer_1-home-host-1,16020,1754391356227]
 regionserver.SerialReplicationChecker: Can not push 
test_table/e8bfb3479752a0512841ed8c47d5c8d0/6=[#edits: 0 = <>], wait
   ```
   
   # The fix
   
   Currently entry with seqid=6 is processed like this:
   
   ```
    private boolean canPush(Entry entry, byte[] row) throws IOException {
      String encodedNameAsString = 
Bytes.toString(entry.getKey().getEncodedRegionName());
      long seqId = entry.getKey().getSequenceId(); // <--- 6
      ReplicationBarrierResult barrierResult = 
MetaTableAccessor.getReplicationBarrierResult(conn,
        entry.getKey().getTableName(), row, 
entry.getKey().getEncodedRegionName());
      LOG.debug("Replication barrier for {}: {}", entry, barrierResult);
      long[] barriers = barrierResult.getBarriers(); // <--- [2, 5, 6]
      int index = Arrays.binarySearch(barriers, seqId); // <--- 2
      if (index == -1) { // <--- false
        LOG.debug("{} is before the first barrier, pass", entry);
        // This means we are in the range before the first record openSeqNum, 
this usually because the
        // wal is written before we enable serial replication for this table, 
just return true since
        // we can not guarantee the order.
        pushed.getUnchecked(encodedNameAsString).setValue(seqId);
        return true;
      }
      // The sequence id range is left closed and right open, so either we 
decrease the missed insert
      // point to make the index start from 0, or increase the hit insert point 
to make the index
      // start from 1. Here we choose the latter one.
      if (index < 0) { // <--- false
        index = -index - 1;
      } else {
        index++; // <--- index = 3
      }
      if (index == 1) { // <--- false
        // we are in the first range, check whether we have parents
        for (byte[] regionName : barrierResult.getParentRegionNames()) {
          if (!isParentFinished(regionName)) {
            LOG.debug("Parent {} has not been finished yet for entry {}, give 
up",
              Bytes.toStringBinary(regionName), entry);
            return false;
          }
        }
        if (isLastRangeAndOpening(barrierResult, index)) {
          LOG.debug("{} is in the last range and the region is opening, give 
up", entry);
          return false;
        }
        LOG.debug("{} is in the first range, pass", entry);
        recordCanPush(encodedNameAsString, seqId, barriers, 1);
        return true;
      }
      // check whether the previous range is finished
      if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) { // <--- 
true
        LOG.debug("Previous range for {} has not been finished yet, give up", 
entry);
        return false; // <--- return false
      }
      if (isLastRangeAndOpening(barrierResult, index)) {
        LOG.debug("{} is in the last range and the region is opening, give up", 
entry);
        return false;
      }
      LOG.debug("The previous range for {} has been finished, pass", entry);
      recordCanPush(encodedNameAsString, seqId, barriers, index);
      return true;
    }
   
   
    private boolean isRangeFinished(long endBarrier, String encodedRegionName) 
throws IOException { // <-- endBarrier = 6
      long pushedSeqId;
      try {
        pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId); // 
<--- pushedSeqId = 4
      } catch (ReplicationException e) {
        throw new IOException(
          "Failed to get pushed sequence id for " + encodedRegionName + ", peer 
" + peerId, e);
      }
      // endBarrier is the open sequence number. When opening a region, the 
open sequence number will
      // be set to the old max sequence id plus one, so here we need to minus 
one.
      return pushedSeqId >= endBarrier - 1; // <--- 4 >= 6-1   <=>    4 >= 5    
<=>    false
    }
   ```
   
   In our fix we change the index calculation so that index is not incremented 
when seqId == barrier[index], so that isRangeFinished is called with 
barrier[index-1] (which returns true) and seqId is passed through.
   
   # Unit test, integration test
   
   TODO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to