[
https://issues.apache.org/jira/browse/HBASE-28584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869030#comment-17869030
]
Andrew Kyle Purtell edited comment on HBASE-28584 at 7/26/24 9:46 PM:
----------------------------------------------------------------------
I have thought about this a bit.
When the replication RPC comes in on the sink, it is processed by the RPC
server. The RPC server allocates a buffer that holds the bytes of the RPC
message submitted by the client. For replication those contents include
serialized cells sent from the replication source. A CellScanner is opened to
iterate over those serialized cells and passed around as a method parameter in
various places.
The CellScanner is used to iterate over cells for replication sink processing.
The cells returned from the cell scanner are not copied into new memory, the
cell objects simply have pointers back to the original buffer allocated to
receive the RPC. The sink then creates client request objects like Puts or
Deletes using those cells. If there is some error applying one of those
submitted local changes, the exception eventually bubbles up and causes the
server to consider the processing of the replication RPC call as failed. It
will then clean up the resources it allocated for the replicateEdits call
including the buffer that holds the cells received from the replication source.
But we can have other concurrent client activities in progress that rely on
that buffer because the cell objects submitted to the client have only pointers
into the buffer originally allocated by the server. The clean up at the RPC
server rug pulls the buffer out from underneath those client activities,
leading in edge cases to crash.
Cloning the cells before handing them off to client code for local RPCs is one
way to solve the problem. It allocates new memory to hold the cell contents and
copies the cell contents out of the server's receive buffer. The resulting
cloned cell is independently managed memory that is safe for use by the sink to
apply the change locally as it uses the client apis to apply the changes to the
local cluster. No need to worry what the server may or may not do. Languages
like Rust have elegant facilities for managing the ownership of memory buffers
for which we have taken pointers, to avoid mistakes like this where the pointer
lifetime exceeds the buffer lifetime. But here in HBase in Java using the netty
buffers, we have to do it by hand.
An alternative approach is to track all of the in flight client RPCs applying
the local changes and wait to clean up the server context until they are all
finished. Although you can imagine that may be more complex and error prone
than simply cloning the cells.
So I applied
[^0001-Deep-clone-cells-set-to-be-replicated-onto-the-local.patch] and tested
it for a while. I profiled the sink regionserver in my testbed with the crash
fix not applied ( [^rs_profile_before.html] ) and after the crash fix is
applied ( [^rs_profile_after.html] ). The only difference between the two
builds is exactly the one patch to fix the crash case. Incidentally this is on
a testbed configured to use TLS RPC, and running on the Azul Java 8 JVM, but
these details are not important. You can use the search function with the term
"replicateEntries" to find the portion of the flamegraph related to processing
of the inbound replication RPCs by the sink. You can then use the search
function with the term "deepClone" to find the new impact of the added clone
operation. There will be no results in before, deepClone will match 0% of
traces. In after, after searching for "deepClone", you will find a purple
highlighted trace above replicateEntries totaling 0.76% of overall CPU time.
This is negligible. replicateEntries overall consumes 5.72% of the CPU, and so
the portion of this time consumed by the cloning represents 13% of the
replicateEntries processing time, a ~10% overhead. I think this is acceptable
given the safety improvements and crash fix.
was (Author: apurtell):
I have thought about this a bit.
When the replication RPC comes in on the sink, it is processed by the RPC
server. The RPC server allocates a buffer that holds the bytes of the RPC
message submitted by the client. For replication those contents include
serialized cells sent from the replication source. A CellScanner is opened to
iterate over those serialized cells and passed around as a method parameter in
various places.
The CellScanner is used to iterate over cells for replication sink processing.
The cells returned from the cell scanner are not copied into new memory, the
cell objects simply have pointers back to the original buffer allocated to
receive the RPC. The sink then creates client request objects like Puts or
Deletes using those cells. If there is some error applying one of those
submitted local changes, the exception eventually bubbles up and causes the
server to consider the processing of the replication RPC call as failed. It
will then clean up the resources it allocated for the replicateEdits call
including the buffer that holds the cells received from the replication source.
But we can have other concurrent client activities in progress that rely on
that buffer because the cell objects submitted to the client have only pointers
into the buffer originally allocated by the server. The clean up at the RPC
server rug pulls the buffer out from underneath those client activities,
leading in edge cases to crash.
Cloning the cells before handing them off to client code for local RPCs is one
way to solve the problem. It allocates new memory to hold the cell contents and
copies the cell contents out of the server's receive buffer. The resulting
cloned cell is independently managed memory that is safe for use by the sink to
apply the change locally as it uses the client apis to apply the changes to the
local cluster. No need to worry what the server may or may not do. Languages
like Rust have elegant facilities for managing the ownership of memory buffers
for which we have taken pointers, to avoid mistakes like this where the pointer
lifetime exceeds the buffer lifetime. But here in HBase in Java using the netty
buffers, we have to do it by hand.
An alternative approach is to track all of the in flight client RPCs applying
the local changes and wait to clean up the server context until they are all
finished. Although you can imagine that may be more complex and error prone
than simply cloning the cells.
So I applied the change that clones cells that are used to initialize client
requests and tested it for a while. I profiled the sink regionserver in my
testbed with the SEGV fix not applied ( [^rs_profile_before.html] ) and after
the SEGV fix is applied ( [^rs_profile_after.html] ). The only difference
between the two builds is exactly the one patch to fix the crash case.
Incidentally this is on a testbed configured to use TLS RPC, and running on the
Azul Java 8 JVM, but these details are not important. You can use the search
function with the term "replicateEntries" to find the portion of the flamegraph
related to processing of the inbound replication RPCs by the sink. You can then
use the search function with the term "deepClone" to find the new impact of the
added clone operation. There will be no results in before, deepClone will match
0% of traces. In after, after searching for "deepClone", you will find a purple
highlighted trace above replicateEntries totaling 0.76% of overall CPU time.
This is negligible. replicateEntries overall consumes 5.72% of the CPU, and so
the portion of this time consumed by the cloning represents 13% of the
replicateEntries processing time, a ~10% overhead. I think this is acceptable
given the safety improvements and crash fix.
> RS SIGSEGV under heavy replication load
> ---------------------------------------
>
> Key: HBASE-28584
> URL: https://issues.apache.org/jira/browse/HBASE-28584
> Project: HBase
> Issue Type: Bug
> Components: regionserver
> Affects Versions: 2.5.6
> Environment: RHEL 7.9
> JDK 11.0.23
> Hadoop 3.2.4
> Hbase 2.5.6
> Reporter: Whitney Jackson
> Assignee: Andrew Kyle Purtell
> Priority: Major
> Attachments:
> 0001-Deep-clone-cells-set-to-be-replicated-onto-the-local.patch,
> 0001-Support-configuration-based-selection-of-netty-chann.patch,
> rs_profile_after.html, rs_profile_before.html
>
>
> I'm observing RS crashes under heavy replication load:
>
> {code:java}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> # SIGSEGV (0xb) at pc=0x00007f7546873b69, pid=29890, tid=36828
> #
> # JRE version: Java(TM) SE Runtime Environment 18.9 (11.0.23+7) (build
> 11.0.23+7-LTS-222)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM 18.9 (11.0.23+7-LTS-222, mixed
> mode, tiered, compressed oops, g1 gc, linux-amd64)
> # Problematic frame:
> # J 24625 c2
> org.apache.hadoop.hbase.util.ByteBufferUtils.copyBufferToStream(Ljava/io/OutputStream;Ljava/nio/ByteBuffer;II)V
> (75 bytes) @ 0x00007f7546873b69 [0x00007f7546873960+0x0000000000000209]
> {code}
>
> The heavier load comes when a replication peer has been disabled for several
> hours for patching etc. When the peer is re-enabled the replication load is
> high until the peer is all caught up. The crashes happen on the cluster
> receiving the replication edits.
>
> I believe this problem started after upgrading from 2.4.x to 2.5.x.
>
> One possibly relevant non-standard config I run with:
> {code:java}
> <property>
> <name>hbase.region.store.parallel.put.limit</name>
> <!-- Default: 10 -->
> <value>100</value>
> <description>Added after seeing "failed to accept edits" replication errors
> in the destination region servers indicating this limit was being exceeded
> while trying to process replication edits.</description>
> </property>
> {code}
>
> I understand from other Jiras that the problem is likely around direct memory
> usage by Netty. I haven't yet tried switching the Netty allocator to
> {{unpooled}} or {{{}heap{}}}. I also haven't yet tried any of the
> {{io.netty.allocator.*}} options.
>
> {{MaxDirectMemorySize}} is set to 26g.
>
> Here's the full stack for the relevant thread:
>
> {code:java}
> Stack: [0x00007f72e2e5f000,0x00007f72e2f60000], sp=0x00007f72e2f5e450, free
> space=1021k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native
> code)
> J 24625 c2
> org.apache.hadoop.hbase.util.ByteBufferUtils.copyBufferToStream(Ljava/io/OutputStream;Ljava/nio/ByteBuffer;II)V
> (75 bytes) @ 0x00007f7546873b69 [0x00007f7546873960+0x0000000000000209]
> J 26253 c2
> org.apache.hadoop.hbase.ByteBufferKeyValue.write(Ljava/io/OutputStream;Z)I
> (21 bytes) @ 0x00007f7545af2d84 [0x00007f7545af2d20+0x0000000000000064]
> J 22971 c2
> org.apache.hadoop.hbase.codec.KeyValueCodecWithTags$KeyValueEncoder.write(Lorg/apache/hadoop/hbase/Cell;)V
> (27 bytes) @ 0x00007f754663f700 [0x00007f754663f4c0+0x0000000000000240]
> J 25251 c2
> org.apache.hadoop.hbase.ipc.NettyRpcDuplexHandler.write(Lorg/apache/hbase/thirdparty/io/netty/channel/ChannelHandlerContext;Ljava/lang/Object;Lorg/apache/hbase/thirdparty/io/netty/channel/ChannelPromise;)V
> (90 bytes) @ 0x00007f7546a53038 [0x00007f7546a50e60+0x00000000000021d8]
> J 21182 c2
> org.apache.hbase.thirdparty.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(Ljava/lang/Object;Lorg/apache/hbase/thirdparty/io/netty/channel/ChannelPromise;)V
> (73 bytes) @ 0x00007f7545f4d90c [0x00007f7545f4d3a0+0x000000000000056c]
> J 21181 c2
> org.apache.hbase.thirdparty.io.netty.channel.AbstractChannelHandlerContext.write(Ljava/lang/Object;ZLorg/apache/hbase/thirdparty/io/netty/channel/ChannelPromise;)V
> (149 bytes) @ 0x00007f7545fd680c [0x00007f7545fd65e0+0x000000000000022c]
> J 25389 c2 org.apache.hadoop.hbase.ipc.NettyRpcConnection$$Lambda$247.run()V
> (16 bytes) @ 0x00007f7546ade660 [0x00007f7546ade140+0x0000000000000520]
> J 24098 c2
> org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(J)Z
> (109 bytes) @ 0x00007f754678fbb8 [0x00007f754678f8e0+0x00000000000002d8]
> J 27297% c2
> org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoop.run()V (603
> bytes) @ 0x00007f75466c4d48 [0x00007f75466c4c80+0x00000000000000c8]
> j
> org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor$4.run()V+44
> j
> org.apache.hbase.thirdparty.io.netty.util.internal.ThreadExecutorMap$2.run()V+11
> j
> org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocalRunnable.run()V+4
> J 12278 c1 java.lang.Thread.run()V [email protected] (17 bytes) @
> 0x00007f753e11f084 [0x00007f753e11ef40+0x0000000000000144]
> v ~StubRoutines::call_stub
> V [libjvm.so+0x85574a] JavaCalls::call_helper(JavaValue*, methodHandle
> const&, JavaCallArguments*, Thread*)+0x27a
> V [libjvm.so+0x853d2e] JavaCalls::call_virtual(JavaValue*, Handle, Klass*,
> Symbol*, Symbol*, Thread*)+0x19e
> V [libjvm.so+0x8ffddf] thread_entry(JavaThread*, Thread*)+0x9f
> V [libjvm.so+0xdb68d1] JavaThread::thread_main_inner()+0x131
> V [libjvm.so+0xdb2c4c] Thread::call_run()+0x13c
> V [libjvm.so+0xc1f2e6] thread_native_entry(Thread*)+0xe6
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)