[
https://issues.apache.org/jira/browse/HDDS-5188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Duong updated HDDS-5188:
------------------------
Status: Reopened (was: Closed)
> Replace GRPC based closed-container replication with Netty based streaming
> --------------------------------------------------------------------------
>
> Key: HDDS-5188
> URL: https://issues.apache.org/jira/browse/HDDS-5188
> Project: Apache Ozone
> Issue Type: Bug
> Reporter: Marton Elek
> Assignee: Duong
> Priority: Major
> Labels: pull-request-available
>
> Today the closed containers are copied between datanodes as one big tar(.gz)
> file. Each datanode runs a GrpcReplicationService (with a grpc server) and
> when the SCM asks the destination-datanode to replicate data, it connects to
> the source datanode and retrieves the data.
> This protocol is based on GRPC and very simple. After the first request
> (download(containerid)) the full container is streamed as a tar file in
> smaller chunks.
> However, this protocol doesn't have any back-pressure / traffic control
> handling. After the first request the FULL 5g container is sent back.
> This approach can fill up the netty buffers very easy:
> {code}
> Exception in thread "grpc-default-executor-0"
> org.apache.ratis.thirdparty.io.netty.util.internal.OutOfDirectMemoryError:
> failed to allocate 2097152 byte(s) of direct memory (used: 3651141911, max:
> 3652190208)
> at
> org.apache.ratis.thirdparty.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:802)
> at
> org.apache.ratis.thirdparty.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:731)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:632)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:607)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:202)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:186)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:123)
> at
> org.apache.ratis.thirdparty.io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:227)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:168)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:141)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:65)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl.sendMessageInternal(ServerCallImpl.java:167)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:149)
> at
> org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:365)
> at
> org.apache.hadoop.ozone.container.replication.GrpcOutputStream.flushBuffer(GrpcOutputStream.java:124)
> at
> org.apache.hadoop.ozone.container.replication.GrpcOutputStream.write(GrpcOutputStream.java:90)
> at
> org.apache.hadoop.ozone.freon.ContentGenerator.write(ContentGenerator.java:76)
> at
> org.apache.hadoop.ozone.freon.ClosedContainerStreamGenerator.copyData(ClosedContainerStreamGenerator.java:19)
> at
> org.apache.hadoop.ozone.container.replication.GrpcReplicationService.download(GrpcReplicationService.java:56)
> at
> org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc$MethodHandlers.invoke(IntraDatanodeProtocolServiceGrpc.java:219)
> at
> org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
> at
> org.apache.ratis.thirdparty.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
> at
> org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
> at
> org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:814)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2021-05-04 16:37:47,996 INFO freon.ProgressBar
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:37:48,998 INFO freon.ProgressBar
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:37:49,998 INFO freon.ProgressBar
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> {code}
> This can be reproduced locally with a simple freon test. (See the code here:
> https://github.com/elek/ozone/tree/grpc-push)
> The new freon test starts a GrpcServer and client. On server side the source
> is replaced with a simple `ContainerReplicationSource` which generates random
> 5g datastream (instead of reading container data from disk).
> On the client side the replicator just downloads the container to the tmp
> location, but it's not moved to the final location.
> This test works well for one container, but the test clearly shows that the
> full container data is streamed at the very beginning:
> (Duplicated lines are removed)
> {code}
> 2021-05-04 16:21:04,281 INFO replication.DownloadAndDiscardReplicator
> (DownloadAndDiscardReplicator.java:replicate(62)) - Starting replication of
> container 0 from [7369fd21-7ee9-4780-a54b-5831e951ca9c{ip: 127.0.0.1, host:
> localhost, ports: [REPLICATION=41379], networkLocation: /default-rack,
> certSerialId: null, persistedOpState: null, persistedOpStateExpiryEpochSec:
> 0}]
> 2021-05-04 16:21:04,434 INFO replication.GrpcReplicationService
> (GrpcReplicationService.java:download(52)) - Streaming container data (0) to
> other datanode
> 2021-05-04 16:21:05,269 INFO freon.ProgressBar
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:06,270 INFO freon.ProgressBar
> ...
> 2021-05-04 16:21:10,275 INFO freon.ProgressBar
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:11,275 INFO freon.ProgressBar
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:11,791 INFO replication.GrpcOutputStream
> (GrpcOutputStream.java:close(104)) - Sent 5368709120 bytes for container 0
> ...
> 2021-05-04 16:21:33,434 INFO freon.ProgressBar
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:33,737 INFO replication.GrpcReplicationClient
> (GrpcReplicationClient.java:onCompleted(190)) - Container 0 is downloaded to
> /tmp/container-copy/container-0.tar.gz
> 2021-05-04 16:21:34,434 INFO freon.ProgressBar
> (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:34,690 INFO replication.DownloadAndDiscardReplicator
> (DownloadAndDiscardReplicator.java:replicate(71)) - Container is downloaded
> but deleted, as you wished /tmp/container-copy/container-0.tar.gz
> {code}
> As you can see the full 5G data is sent out at 16:21:11 (after 6 seconds),
> but the data copy is finished only at 16:21:33 (22 more seconds).
> Between the two time the majority of the container is kept in the GRPC/netty
> buffers.
> As an experiment we can make the grpc client slow (GrpcReplicationClient):
> {code}
> @Override
> public void onNext(CopyContainerResponseProto chunk) {
> try {
> try {
> Thread.sleep(1_000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> chunk.getData().writeTo(stream);
> } catch (IOException e) {
> response.completeExceptionally(e);
> }
> }
> {code}
> With this method we download the beginning of the container very slowly, and
> this is enough to get the exception above.
> {code}
> Exception in thread "grpc-default-executor-0"
> org.apache.ratis.thirdparty.io.netty.util.internal.OutOfDirectMemoryError:
> failed to allocate 2097152 byte(s) of direct memory (used: 3651141911, max:
> 3652190208)
> {code}
> Temporary it can be fixed with increasing the netty memory:
> -Dorg.apache.ratis.thirdparty.io.netty.maxDirectMemory=16000000000 but it's
> not a good long-term solution.
> So we need to refactor the protocol to do a request/response chunk by chunk.
> But we also have another problem. GRPC is not optimal for fast streaming.
> The previous log showed that we replicated the container (5G) under 30
> seconds (without reading the original container and without doing tar file
> compression).
> This is 5 / 30 = 170 Mb / sec. (I wrote to a tmpfs on the destination side,
> but even my nvme is significant faster).
> Based on
> [this|https://blog.reverberate.org/2021/04/21/musttail-efficient-interpreters.html]
> article the best (!) results (with C client!) were 1 Gb/s with GRPC. (with
> the explained black magic it is doubled).
> Ansh Khanna earlier did some low-level benchmarking (for ratis streaming)
> which showed 5x difference between pure netty and GRPC:
> Flatbuffers over GRPC
> %CPU in Buffer Copying/Allocations: >10%
> Time(in seconds): 16.44
>
> Protobuffers over GRPC:
> %CPU in Buffer Copying/Allocations: ~10%
> Time(in seconds): 11.66
> Netty Based Streaming
> %CPU in Buffer Copying/Allocations: 0%
> Time(in seconds): 2.7
> Pure netty also supports zero copy async stream.
> Summary:
> 1. The current implementation should be refactored to avoid pushing the data
> 2. Netty seems to be better for long-term solution
> --> As a results it seems to be easier to create a POC with netty support and
> check how does it work.
> Earlier I made an attempt which can be found here:
> https://github.com/elek/ozone/tree/close-container-replication-refactor
> It's a generic interface which may also be used in
> https://issues.apache.org/jira/browse/HDDS-5142 But at least it can be used
> to compare the netty vs GRPC performance in this situation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]