This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new e742fc341ca HBASE-29230 Possible resource leak when 
RegionReplicationSink sends WALEdits (#6867)
e742fc341ca is described below

commit e742fc341ca9279b13c33f7df85d591c8af0bb91
Author: chenglei <[email protected]>
AuthorDate: Sat Apr 19 21:56:13 2025 +0800

    HBASE-29230 Possible resource leak when RegionReplicationSink sends 
WALEdits (#6867)
---
 .../regionreplication/RegionReplicationSink.java   | 12 ++--
 .../TestRegionReplicationSink.java                 | 68 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 4 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index d06a1d3c427..b380193fa6b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -254,6 +254,13 @@ public class RegionReplicationSink {
   }
 
   private void send() {
+    // We should check if there are normal replicas first
+    int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
+    if (toSendReplicaCount <= 0) {
+      LOG.warn("All replicas {} are failed, exit send....", failedReplicas);
+      return;
+    }
+
     List<SinkEntry> toSend = new ArrayList<>();
     long totalSize = 0L;
     boolean hasMetaEdit = false;
@@ -269,10 +276,7 @@ public class RegionReplicationSink {
         break;
       }
     }
-    int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
-    if (toSendReplicaCount <= 0) {
-      return;
-    }
+
     long rpcTimeoutNsToUse;
     long operationTimeoutNsToUse;
     if (!hasMetaEdit) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index cbc59178333..cf86872444f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -316,6 +316,74 @@ public class TestRegionReplicationSink {
     assertEquals(0, sink.pendingSize());
   }
 
+  /**
+   * This test is for HBASE-29230, when all replicas are failed, resource 
should be released
+   * completely.
+   */
+  @Test
+  public void testAllReplicaFailed() {
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Stream.generate(() -> new 
CompletableFuture<Void>()).limit(5).collect(Collectors.toList());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+
+    ServerCall<?> rpcCall1 = mock(ServerCall.class);
+    WALKeyImpl key1 = mock(WALKeyImpl.class);
+    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+    when(key1.getSequenceId()).thenReturn(1L);
+    WALEdit edit1 = mock(WALEdit.class);
+    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+    sink.add(key1, edit1, rpcCall1);
+    verify(rpcCall1, times(1)).retainByWAL();
+
+    ServerCall<?> rpcCall2 = mock(ServerCall.class);
+    WALKeyImpl key2 = mock(WALKeyImpl.class);
+    when(key2.estimatedSerializedSizeOf()).thenReturn(100L);
+    when(key2.getSequenceId()).thenReturn(2L);
+    WALEdit edit2 = mock(WALEdit.class);
+    when(edit2.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+    sink.add(key2, edit2, rpcCall2);
+    verify(rpcCall2, times(1)).retainByWAL();
+
+    // fail all replicas for edit1, so edit2 could not send.
+    futures.get(0).completeExceptionally(new IOException("inject error"));
+    futures.get(1).completeExceptionally(new IOException("inject error"));
+
+    // we should only call replicate for edit1
+    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), 
anyLong());
+
+    // flush
+    ServerCall<?> rpcCall3 = mock(ServerCall.class);
+    WALKeyImpl key3 = mock(WALKeyImpl.class);
+    when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
+    when(key3.getSequenceId()).thenReturn(4L);
+    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+      .collect(Collectors.toMap(Function.identity(), k -> 
Collections.emptyList(), (u, v) -> {
+        throw new IllegalStateException();
+      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+    FlushDescriptor fd =
+      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 3L, 
committedFiles);
+    WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
+    sink.add(key3, edit3, rpcCall3);
+    verify(rpcCall3, times(1)).retainByWAL();
+
+    // the flush marker should have cleared the failedReplicas, so we will 
send the edit to 2
+    // replicas again
+    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), 
anyLong());
+    futures.get(2).complete(null);
+    futures.get(3).complete(null);
+
+    // all ServerCall should be released and pendingSize should be 0
+    verify(rpcCall1, times(1)).releaseByWAL();
+    verify(rpcCall2, times(1)).releaseByWAL();
+    verify(rpcCall3, times(1)).releaseByWAL();
+    assertEquals(0, sink.pendingSize());
+
+  }
+
   @Test
   public void testSizeCapacity() {
     MutableInt next = new MutableInt(0);

Reply via email to