[ 
https://issues.apache.org/jira/browse/HDDS-5188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17806420#comment-17806420
 ] 

Duong commented on HDDS-5188:
-----------------------------

Taking over and reopening this issue. This is an important change to improve 
replication performance and DN stability.

We've encountered high direct memory usage (and OutOfDirectMemory) for closed 
container replication and temporarily solved it by HDDS-9081. However, moving 
to netty streaming is still important.

Advantages of moving to netty streaming and await from GRPC:
 * Streaming will significantly reduce the memory footprint by avoiding 
creating and sending the 4MB chunks. 
 * Streaming is zero-copy, which is not only faster but also prevents creating 
garbage for GC. 
The non-zero-copy GRPC transport results in byte array chunks to be collected 
by GC. Every replication creates a surge of heat to GC reducing the DN 
capacity. 

> Replace GRPC based closed-container replication with Netty based streaming
> --------------------------------------------------------------------------
>
>                 Key: HDDS-5188
>                 URL: https://issues.apache.org/jira/browse/HDDS-5188
>             Project: Apache Ozone
>          Issue Type: Bug
>            Reporter: Marton Elek
>            Assignee: Duong
>            Priority: Major
>              Labels: pull-request-available
>
> Today the closed containers are copied between datanodes as one big tar(.gz) 
> file. Each datanode runs a GrpcReplicationService (with a grpc server) and 
> when the SCM asks the destination-datanode to replicate data, it connects to 
> the source datanode and retrieves the data.
> This protocol is based on GRPC and very simple. After the first request 
> (download(containerid)) the full container is streamed as a tar file in 
> smaller chunks.
> However, this protocol doesn't have any back-pressure  / traffic control 
> handling. After the first request the FULL 5g container is sent back. 
> This approach can fill up the netty buffers very easy:
> {code}
>       Exception in thread "grpc-default-executor-0" 
> org.apache.ratis.thirdparty.io.netty.util.internal.OutOfDirectMemoryError: 
> failed to allocate 2097152 byte(s) of direct memory (used: 3651141911, max: 
> 3652190208)
>       at 
> org.apache.ratis.thirdparty.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:802)
>       at 
> org.apache.ratis.thirdparty.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:731)
>       at 
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:632)
>       at 
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:607)
>       at 
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:202)
>       at 
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:186)
>       at 
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
>       at 
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
>       at 
> org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395)
>       at 
> org.apache.ratis.thirdparty.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
>       at 
> org.apache.ratis.thirdparty.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:123)
>       at 
> org.apache.ratis.thirdparty.io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51)
>       at 
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:227)
>       at 
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:168)
>       at 
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:141)
>       at 
> org.apache.ratis.thirdparty.io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:65)
>       at 
> org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl.sendMessageInternal(ServerCallImpl.java:167)
>       at 
> org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:149)
>       at 
> org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:365)
>       at 
> org.apache.hadoop.ozone.container.replication.GrpcOutputStream.flushBuffer(GrpcOutputStream.java:124)
>       at 
> org.apache.hadoop.ozone.container.replication.GrpcOutputStream.write(GrpcOutputStream.java:90)
>       at 
> org.apache.hadoop.ozone.freon.ContentGenerator.write(ContentGenerator.java:76)
>       at 
> org.apache.hadoop.ozone.freon.ClosedContainerStreamGenerator.copyData(ClosedContainerStreamGenerator.java:19)
>       at 
> org.apache.hadoop.ozone.container.replication.GrpcReplicationService.download(GrpcReplicationService.java:56)
>       at 
> org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc$MethodHandlers.invoke(IntraDatanodeProtocolServiceGrpc.java:219)
>       at 
> org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
>       at 
> org.apache.ratis.thirdparty.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>       at 
> org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>       at 
> org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>       at 
> org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>       at 
> org.apache.ratis.thirdparty.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:814)
>       at 
> org.apache.ratis.thirdparty.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.apache.ratis.thirdparty.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> 2021-05-04 16:37:47,996 INFO  freon.ProgressBar 
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:37:48,998 INFO  freon.ProgressBar 
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:37:49,998 INFO  freon.ProgressBar 
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> {code}
> This can be reproduced locally with a simple freon test. (See the code here: 
> https://github.com/elek/ozone/tree/grpc-push)
> The new freon test starts a GrpcServer and client. On server side the source 
> is replaced with a simple `ContainerReplicationSource` which generates random 
> 5g datastream (instead of reading container data from disk).
> On the client side the replicator just downloads the container to the tmp 
> location, but it's not moved to the final location.
> This test works well for one container, but the test clearly shows that the 
> full container data is streamed at the very beginning:
> (Duplicated lines are removed)
> {code}
> 2021-05-04 16:21:04,281 INFO  replication.DownloadAndDiscardReplicator 
> (DownloadAndDiscardReplicator.java:replicate(62)) - Starting replication of 
> container 0 from [7369fd21-7ee9-4780-a54b-5831e951ca9c{ip: 127.0.0.1, host: 
> localhost, ports: [REPLICATION=41379], networkLocation: /default-rack, 
> certSerialId: null, persistedOpState: null, persistedOpStateExpiryEpochSec: 
> 0}]
> 2021-05-04 16:21:04,434 INFO  replication.GrpcReplicationService 
> (GrpcReplicationService.java:download(52)) - Streaming container data (0) to 
> other datanode
> 2021-05-04 16:21:05,269 INFO  freon.ProgressBar 
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:06,270 INFO  freon.ProgressBar 
> ...
> 2021-05-04 16:21:10,275 INFO  freon.ProgressBar 
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:11,275 INFO  freon.ProgressBar 
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:11,791 INFO  replication.GrpcOutputStream 
> (GrpcOutputStream.java:close(104)) - Sent 5368709120 bytes for container 0
> ...
> 2021-05-04 16:21:33,434 INFO  freon.ProgressBar 
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:33,737 INFO  replication.GrpcReplicationClient 
> (GrpcReplicationClient.java:onCompleted(190)) - Container 0 is downloaded to 
> /tmp/container-copy/container-0.tar.gz
> 2021-05-04 16:21:34,434 INFO  freon.ProgressBar 
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:34,690 INFO  replication.DownloadAndDiscardReplicator 
> (DownloadAndDiscardReplicator.java:replicate(71)) - Container is downloaded 
> but deleted, as you wished /tmp/container-copy/container-0.tar.gz
> {code}
> As you can see the full 5G data is sent out at 16:21:11 (after 6 seconds), 
> but the data copy is finished only at  16:21:33 (22 more seconds).
> Between the two time the majority of the container is kept in the GRPC/netty 
> buffers.
> As an experiment we can make the grpc client slow (GrpcReplicationClient):
> {code}
>     @Override
>     public void onNext(CopyContainerResponseProto chunk) {
>       try {
>         try {
>           Thread.sleep(1_000);
>         } catch (InterruptedException e) {
>           e.printStackTrace();
>         }
>         chunk.getData().writeTo(stream);
>       } catch (IOException e) {
>         response.completeExceptionally(e);
>       }
>     }
> {code}
> With this method we download the beginning of the container very slowly, and 
> this is enough to get the exception above.
> {code}
> Exception in thread "grpc-default-executor-0" 
> org.apache.ratis.thirdparty.io.netty.util.internal.OutOfDirectMemoryError: 
> failed to allocate 2097152 byte(s) of direct memory (used: 3651141911, max: 
> 3652190208)
> {code}
> Temporary it can be fixed with increasing the netty memory: 
> -Dorg.apache.ratis.thirdparty.io.netty.maxDirectMemory=16000000000  but it's 
> not a good long-term solution.
> So we need to refactor the protocol to do a request/response chunk by chunk.
> But we also have another problem. GRPC is not optimal for fast streaming.
> The previous log showed that we replicated the container (5G) under 30 
> seconds (without reading the original container and without doing tar file 
> compression).
> This is 5 / 30 = 170 Mb / sec. (I wrote to a tmpfs on the destination side, 
> but even my nvme is significant faster).
> Based on 
> [this|https://blog.reverberate.org/2021/04/21/musttail-efficient-interpreters.html]
>  article the best (!) results (with C client!) were 1 Gb/s with GRPC. (with 
> the explained black magic it is doubled).
> Ansh Khanna earlier did some low-level benchmarking (for ratis streaming) 
> which showed 5x difference between pure netty and GRPC:
> Flatbuffers over GRPC
> %CPU in Buffer Copying/Allocations: >10%
> Time(in seconds): 16.44
>  
> Protobuffers over GRPC:
> %CPU in Buffer Copying/Allocations: ~10%
> Time(in seconds): 11.66
> Netty Based Streaming
> %CPU in Buffer Copying/Allocations: 0%
> Time(in seconds): 2.7
> Pure netty also supports zero copy async stream.
> Summary:
>  1. The current implementation should be refactored to avoid pushing the data
>  2. Netty seems to be better for long-term solution
> --> As a results it seems to be easier to create a POC with netty support and 
> check how does it work.
> Earlier I made an attempt which can be found here: 
> https://github.com/elek/ozone/tree/close-container-replication-refactor
> It's a generic interface which may also be used in 
> https://issues.apache.org/jira/browse/HDDS-5142 But at least it can be used 
> to compare the netty vs GRPC performance in this situation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to