[ 
https://issues.apache.org/jira/browse/HBASE-28584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869030#comment-17869030
 ] 

Andrew Kyle Purtell edited comment on HBASE-28584 at 7/26/24 9:46 PM:
----------------------------------------------------------------------

I have thought about this a bit.

When the replication RPC comes in on the sink, it is processed by the RPC 
server. The RPC server allocates a buffer that holds the bytes of the RPC 
message submitted by the client. For replication those contents include 
serialized cells sent from the replication source. A CellScanner is opened to 
iterate over those serialized cells and passed around as a method parameter in 
various places.
The CellScanner is used to iterate over cells for replication sink processing. 
The cells returned from the cell scanner are not copied into new memory, the 
cell objects simply have pointers back to the original buffer allocated to 
receive the RPC. The sink then creates client request objects like Puts or 
Deletes using those cells. If there is some error applying one of those 
submitted local changes, the exception eventually bubbles up and causes the 
server to consider the processing of the replication RPC call as failed. It 
will then clean up the resources it allocated for the replicateEdits call 
including the buffer that holds the cells received from the replication source. 
 But we can have other concurrent client activities in progress that rely on 
that buffer because the cell objects submitted to the client have only pointers 
into the buffer originally allocated by the server. The clean up at the RPC 
server rug pulls the buffer out from underneath those client activities, 
leading in edge cases to crash.

Cloning the cells before handing them off to client code for local RPCs is one 
way to solve the problem. It allocates new memory to hold the cell contents and 
copies the cell contents out of the server's receive buffer. The resulting 
cloned cell is independently managed memory that is safe for use by the sink to 
apply the change locally as it uses the client apis to apply the changes to the 
local cluster. No need to worry what the server may or may not do. Languages 
like Rust have elegant facilities for managing the ownership of memory buffers 
for which we have taken pointers, to avoid mistakes like this where the pointer 
lifetime exceeds the buffer lifetime. But here in HBase in Java using the netty 
buffers, we have to do it by hand.

An alternative approach is to track all of the in flight client RPCs applying 
the local changes and wait to clean up the server context until they are all 
finished. Although you can imagine that may be more complex and error prone 
than simply cloning the cells.

So I applied  
[^0001-Deep-clone-cells-set-to-be-replicated-onto-the-local.patch]  and tested 
it for a while. I profiled the sink regionserver in my testbed with the crash 
fix not applied ( [^rs_profile_before.html] ) and after the crash fix is 
applied (  [^rs_profile_after.html]  ). The only difference between the two 
builds is exactly the one patch to fix the crash case. Incidentally this is on 
a testbed configured to use TLS RPC, and running on the Azul Java 8 JVM, but 
these details are not important. You can use the search function with the term 
"replicateEntries" to find the portion of the flamegraph related to processing 
of the inbound replication RPCs by the sink. You can then use the search 
function with the term "deepClone" to find the new impact of the added clone 
operation. There will be no results in before, deepClone will match 0% of 
traces. In after, after searching for "deepClone", you will find a purple 
highlighted trace above replicateEntries totaling 0.76% of overall CPU time. 
This is negligible. replicateEntries overall consumes 5.72% of the CPU, and so 
the portion of this time consumed by the cloning represents 13% of the 
replicateEntries processing time, a ~10% overhead. I think this is acceptable 
given the safety improvements and crash fix. 


was (Author: apurtell):
I have thought about this a bit.

When the replication RPC comes in on the sink, it is processed by the RPC 
server. The RPC server allocates a buffer that holds the bytes of the RPC 
message submitted by the client. For replication those contents include 
serialized cells sent from the replication source. A CellScanner is opened to 
iterate over those serialized cells and passed around as a method parameter in 
various places.
The CellScanner is used to iterate over cells for replication sink processing. 
The cells returned from the cell scanner are not copied into new memory, the 
cell objects simply have pointers back to the original buffer allocated to 
receive the RPC. The sink then creates client request objects like Puts or 
Deletes using those cells. If there is some error applying one of those 
submitted local changes, the exception eventually bubbles up and causes the 
server to consider the processing of the replication RPC call as failed. It 
will then clean up the resources it allocated for the replicateEdits call 
including the buffer that holds the cells received from the replication source. 
 But we can have other concurrent client activities in progress that rely on 
that buffer because the cell objects submitted to the client have only pointers 
into the buffer originally allocated by the server. The clean up at the RPC 
server rug pulls the buffer out from underneath those client activities, 
leading in edge cases to crash.

Cloning the cells before handing them off to client code for local RPCs is one 
way to solve the problem. It allocates new memory to hold the cell contents and 
copies the cell contents out of the server's receive buffer. The resulting 
cloned cell is independently managed memory that is safe for use by the sink to 
apply the change locally as it uses the client apis to apply the changes to the 
local cluster. No need to worry what the server may or may not do. Languages 
like Rust have elegant facilities for managing the ownership of memory buffers 
for which we have taken pointers, to avoid mistakes like this where the pointer 
lifetime exceeds the buffer lifetime. But here in HBase in Java using the netty 
buffers, we have to do it by hand.

An alternative approach is to track all of the in flight client RPCs applying 
the local changes and wait to clean up the server context until they are all 
finished. Although you can imagine that may be more complex and error prone 
than simply cloning the cells.

So I applied the change that clones cells that are used to initialize client 
requests and tested it for a while. I profiled the sink regionserver in my 
testbed with the SEGV fix not applied ( [^rs_profile_before.html] ) and after 
the SEGV fix is applied (  [^rs_profile_after.html]  ). The only difference 
between the two builds is exactly the one patch to fix the crash case. 
Incidentally this is on a testbed configured to use TLS RPC, and running on the 
Azul Java 8 JVM, but these details are not important. You can use the search 
function with the term "replicateEntries" to find the portion of the flamegraph 
related to processing of the inbound replication RPCs by the sink. You can then 
use the search function with the term "deepClone" to find the new impact of the 
added clone operation. There will be no results in before, deepClone will match 
0% of traces. In after, after searching for "deepClone", you will find a purple 
highlighted trace above replicateEntries totaling 0.76% of overall CPU time. 
This is negligible. replicateEntries overall consumes 5.72% of the CPU, and so 
the portion of this time consumed by the cloning represents 13% of the 
replicateEntries processing time, a ~10% overhead. I think this is acceptable 
given the safety improvements and crash fix. 

> RS SIGSEGV under heavy replication load
> ---------------------------------------
>
>                 Key: HBASE-28584
>                 URL: https://issues.apache.org/jira/browse/HBASE-28584
>             Project: HBase
>          Issue Type: Bug
>          Components: regionserver
>    Affects Versions: 2.5.6
>         Environment: RHEL 7.9
> JDK 11.0.23
> Hadoop 3.2.4
> Hbase 2.5.6
>            Reporter: Whitney Jackson
>            Assignee: Andrew Kyle Purtell
>            Priority: Major
>         Attachments: 
> 0001-Deep-clone-cells-set-to-be-replicated-onto-the-local.patch, 
> 0001-Support-configuration-based-selection-of-netty-chann.patch, 
> rs_profile_after.html, rs_profile_before.html
>
>
> I'm observing RS crashes under heavy replication load:
>  
> {code:java}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x00007f7546873b69, pid=29890, tid=36828
> #
> # JRE version: Java(TM) SE Runtime Environment 18.9 (11.0.23+7) (build 
> 11.0.23+7-LTS-222)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM 18.9 (11.0.23+7-LTS-222, mixed 
> mode, tiered, compressed oops, g1 gc, linux-amd64)
> # Problematic frame:
> # J 24625 c2 
> org.apache.hadoop.hbase.util.ByteBufferUtils.copyBufferToStream(Ljava/io/OutputStream;Ljava/nio/ByteBuffer;II)V
>  (75 bytes) @ 0x00007f7546873b69 [0x00007f7546873960+0x0000000000000209]
> {code}
>  
> The heavier load comes when a replication peer has been disabled for several 
> hours for patching etc. When the peer is re-enabled the replication load is 
> high until the peer is all caught up. The crashes happen on the cluster 
> receiving the replication edits.
>  
> I believe this problem started after upgrading from 2.4.x to 2.5.x.
>  
> One possibly relevant non-standard config I run with:
> {code:java}
> <property>
>   <name>hbase.region.store.parallel.put.limit</name>
>   <!-- Default: 10  -->
>   <value>100</value>
>   <description>Added after seeing "failed to accept edits" replication errors 
> in the destination region servers indicating this limit was being exceeded 
> while trying to process replication edits.</description>
> </property>
> {code}
>  
> I understand from other Jiras that the problem is likely around direct memory 
> usage by Netty. I haven't yet tried switching the Netty allocator to 
> {{unpooled}} or {{{}heap{}}}. I also haven't yet tried any of the  
> {{io.netty.allocator.*}} options.
>  
> {{MaxDirectMemorySize}} is set to 26g.
>  
> Here's the full stack for the relevant thread:
>  
> {code:java}
> Stack: [0x00007f72e2e5f000,0x00007f72e2f60000],  sp=0x00007f72e2f5e450,  free 
> space=1021k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
> code)
> J 24625 c2 
> org.apache.hadoop.hbase.util.ByteBufferUtils.copyBufferToStream(Ljava/io/OutputStream;Ljava/nio/ByteBuffer;II)V
>  (75 bytes) @ 0x00007f7546873b69 [0x00007f7546873960+0x0000000000000209]
> J 26253 c2 
> org.apache.hadoop.hbase.ByteBufferKeyValue.write(Ljava/io/OutputStream;Z)I 
> (21 bytes) @ 0x00007f7545af2d84 [0x00007f7545af2d20+0x0000000000000064]
> J 22971 c2 
> org.apache.hadoop.hbase.codec.KeyValueCodecWithTags$KeyValueEncoder.write(Lorg/apache/hadoop/hbase/Cell;)V
>  (27 bytes) @ 0x00007f754663f700 [0x00007f754663f4c0+0x0000000000000240]
> J 25251 c2 
> org.apache.hadoop.hbase.ipc.NettyRpcDuplexHandler.write(Lorg/apache/hbase/thirdparty/io/netty/channel/ChannelHandlerContext;Ljava/lang/Object;Lorg/apache/hbase/thirdparty/io/netty/channel/ChannelPromise;)V
>  (90 bytes) @ 0x00007f7546a53038 [0x00007f7546a50e60+0x00000000000021d8]
> J 21182 c2 
> org.apache.hbase.thirdparty.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(Ljava/lang/Object;Lorg/apache/hbase/thirdparty/io/netty/channel/ChannelPromise;)V
>  (73 bytes) @ 0x00007f7545f4d90c [0x00007f7545f4d3a0+0x000000000000056c]
> J 21181 c2 
> org.apache.hbase.thirdparty.io.netty.channel.AbstractChannelHandlerContext.write(Ljava/lang/Object;ZLorg/apache/hbase/thirdparty/io/netty/channel/ChannelPromise;)V
>  (149 bytes) @ 0x00007f7545fd680c [0x00007f7545fd65e0+0x000000000000022c]
> J 25389 c2 org.apache.hadoop.hbase.ipc.NettyRpcConnection$$Lambda$247.run()V 
> (16 bytes) @ 0x00007f7546ade660 [0x00007f7546ade140+0x0000000000000520]
> J 24098 c2 
> org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(J)Z
>  (109 bytes) @ 0x00007f754678fbb8 [0x00007f754678f8e0+0x00000000000002d8]
> J 27297% c2 
> org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoop.run()V (603 
> bytes) @ 0x00007f75466c4d48 [0x00007f75466c4c80+0x00000000000000c8]
> j  
> org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor$4.run()V+44
> j  
> org.apache.hbase.thirdparty.io.netty.util.internal.ThreadExecutorMap$2.run()V+11
> j  
> org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocalRunnable.run()V+4
> J 12278 c1 java.lang.Thread.run()V [email protected] (17 bytes) @ 
> 0x00007f753e11f084 [0x00007f753e11ef40+0x0000000000000144]
> v  ~StubRoutines::call_stub
> V  [libjvm.so+0x85574a]  JavaCalls::call_helper(JavaValue*, methodHandle 
> const&, JavaCallArguments*, Thread*)+0x27a
> V  [libjvm.so+0x853d2e]  JavaCalls::call_virtual(JavaValue*, Handle, Klass*, 
> Symbol*, Symbol*, Thread*)+0x19e
> V  [libjvm.so+0x8ffddf]  thread_entry(JavaThread*, Thread*)+0x9f
> V  [libjvm.so+0xdb68d1]  JavaThread::thread_main_inner()+0x131
> V  [libjvm.so+0xdb2c4c]  Thread::call_run()+0x13c
> V  [libjvm.so+0xc1f2e6]  thread_native_entry(Thread*)+0xe6
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to