This is an automated email from the ASF dual-hosted git repository.
chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new e742fc341ca HBASE-29230 Possible resource leak when
RegionReplicationSink sends WALEdits (#6867)
e742fc341ca is described below
commit e742fc341ca9279b13c33f7df85d591c8af0bb91
Author: chenglei <[email protected]>
AuthorDate: Sat Apr 19 21:56:13 2025 +0800
HBASE-29230 Possible resource leak when RegionReplicationSink sends
WALEdits (#6867)
---
.../regionreplication/RegionReplicationSink.java | 12 ++--
.../TestRegionReplicationSink.java | 68 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 4 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index d06a1d3c427..b380193fa6b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -254,6 +254,13 @@ public class RegionReplicationSink {
}
private void send() {
+ // We should check if there are normal replicas first
+ int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
+ if (toSendReplicaCount <= 0) {
+ LOG.warn("All replicas {} are failed, exit send....", failedReplicas);
+ return;
+ }
+
List<SinkEntry> toSend = new ArrayList<>();
long totalSize = 0L;
boolean hasMetaEdit = false;
@@ -269,10 +276,7 @@ public class RegionReplicationSink {
break;
}
}
- int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
- if (toSendReplicaCount <= 0) {
- return;
- }
+
long rpcTimeoutNsToUse;
long operationTimeoutNsToUse;
if (!hasMetaEdit) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index cbc59178333..cf86872444f 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -316,6 +316,74 @@ public class TestRegionReplicationSink {
assertEquals(0, sink.pendingSize());
}
+ /**
+ * This test is for HBASE-29230, when all replicas are failed, resource
should be released
+ * completely.
+ */
+ @Test
+ public void testAllReplicaFailed() {
+ MutableInt next = new MutableInt(0);
+ List<CompletableFuture<Void>> futures =
+ Stream.generate(() -> new
CompletableFuture<Void>()).limit(5).collect(Collectors.toList());
+ when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+ .then(i -> futures.get(next.getAndIncrement()));
+
+ ServerCall<?> rpcCall1 = mock(ServerCall.class);
+ WALKeyImpl key1 = mock(WALKeyImpl.class);
+ when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+ when(key1.getSequenceId()).thenReturn(1L);
+ WALEdit edit1 = mock(WALEdit.class);
+ when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
+ when(manager.increase(anyLong())).thenReturn(true);
+ sink.add(key1, edit1, rpcCall1);
+ verify(rpcCall1, times(1)).retainByWAL();
+
+ ServerCall<?> rpcCall2 = mock(ServerCall.class);
+ WALKeyImpl key2 = mock(WALKeyImpl.class);
+ when(key2.estimatedSerializedSizeOf()).thenReturn(100L);
+ when(key2.getSequenceId()).thenReturn(2L);
+ WALEdit edit2 = mock(WALEdit.class);
+ when(edit2.estimatedSerializedSizeOf()).thenReturn(1000L);
+ when(manager.increase(anyLong())).thenReturn(true);
+ sink.add(key2, edit2, rpcCall2);
+ verify(rpcCall2, times(1)).retainByWAL();
+
+ // fail all replicas for edit1, so edit2 could not send.
+ futures.get(0).completeExceptionally(new IOException("inject error"));
+ futures.get(1).completeExceptionally(new IOException("inject error"));
+
+ // we should only call replicate for edit1
+ verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(),
anyLong());
+
+ // flush
+ ServerCall<?> rpcCall3 = mock(ServerCall.class);
+ WALKeyImpl key3 = mock(WALKeyImpl.class);
+ when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
+ when(key3.getSequenceId()).thenReturn(4L);
+ Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+ .collect(Collectors.toMap(Function.identity(), k ->
Collections.emptyList(), (u, v) -> {
+ throw new IllegalStateException();
+ }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+ FlushDescriptor fd =
+ ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 3L,
committedFiles);
+ WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
+ sink.add(key3, edit3, rpcCall3);
+ verify(rpcCall3, times(1)).retainByWAL();
+
+ // the flush marker should have cleared the failedReplicas, so we will
send the edit to 2
+ // replicas again
+ verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(),
anyLong());
+ futures.get(2).complete(null);
+ futures.get(3).complete(null);
+
+ // all ServerCall should be released and pendingSize should be 0
+ verify(rpcCall1, times(1)).releaseByWAL();
+ verify(rpcCall2, times(1)).releaseByWAL();
+ verify(rpcCall3, times(1)).releaseByWAL();
+ assertEquals(0, sink.pendingSize());
+
+ }
+
@Test
public void testSizeCapacity() {
MutableInt next = new MutableInt(0);