Marton Elek created HDDS-5188:
---------------------------------
Summary: Replace GRPC based closed-container replication with
Netty based streaming
Key: HDDS-5188
URL: https://issues.apache.org/jira/browse/HDDS-5188
Project: Apache Ozone
Issue Type: Bug
Reporter: Marton Elek
Assignee: Marton Elek
Today the closed containers are copied between datanodes as one big tar(.gz)
file. Each datanode runs a GrpcReplicationService (with a grpc server) and when
the SCM asks the destination-datanode to replicate data, it connects to the
source datanode and retrieves the data.
This protocol is based on GRPC and very simple. After the first request
(download(containerid)) the full container is streamed as a tar file in smaller
chunks.
However, this protocol doesn't have any back-pressure / traffic control
handling. After the first request the FULL 5g container is sent back.
This approach can fill up the netty buffers very easy:
{code}
Exception in thread "grpc-default-executor-0"
org.apache.ratis.thirdparty.io.netty.util.internal.OutOfDirectMemoryError:
failed to allocate 2097152 byte(s) of direct memory (used: 3651141911, max:
3652190208)
at
org.apache.ratis.thirdparty.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:802)
at
org.apache.ratis.thirdparty.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:731)
at
org.apache.ratis.thirdparty.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:632)
at
org.apache.ratis.thirdparty.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:607)
at
org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:202)
at
org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:186)
at
org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
at
org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
at
org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395)
at
org.apache.ratis.thirdparty.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
at
org.apache.ratis.thirdparty.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:123)
at
org.apache.ratis.thirdparty.io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51)
at
org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:227)
at
org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:168)
at
org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:141)
at
org.apache.ratis.thirdparty.io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:65)
at
org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl.sendMessageInternal(ServerCallImpl.java:167)
at
org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:149)
at
org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:365)
at
org.apache.hadoop.ozone.container.replication.GrpcOutputStream.flushBuffer(GrpcOutputStream.java:124)
at
org.apache.hadoop.ozone.container.replication.GrpcOutputStream.write(GrpcOutputStream.java:90)
at
org.apache.hadoop.ozone.freon.ContentGenerator.write(ContentGenerator.java:76)
at
org.apache.hadoop.ozone.freon.ClosedContainerStreamGenerator.copyData(ClosedContainerStreamGenerator.java:19)
at
org.apache.hadoop.ozone.container.replication.GrpcReplicationService.download(GrpcReplicationService.java:56)
at
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc$MethodHandlers.invoke(IntraDatanodeProtocolServiceGrpc.java:219)
at
org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at
org.apache.ratis.thirdparty.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at
org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at
org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at
org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at
org.apache.ratis.thirdparty.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:814)
at
org.apache.ratis.thirdparty.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.ratis.thirdparty.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-05-04 16:37:47,996 INFO freon.ProgressBar
(ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
2021-05-04 16:37:48,998 INFO freon.ProgressBar
(ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
2021-05-04 16:37:49,998 INFO freon.ProgressBar
(ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
{code}
This can be reproduced locally with a simple freon test. (See the code here:
https://github.com/elek/ozone/tree/grpc-push)
The new freon test starts a GrpcServer and client. On server side the source is
replaced with a simple `ContainerReplicationSource` which generates random 5g
datastream (instead of reading container data from disk).
On the client side the replicator just downloads the container to the tmp
location, but it's not moved to the final location.
This test works well for one container, but the test clearly shows that the
full container data is streamed at the very beginning:
(Duplicated lines are removed)
{code}
2021-05-04 16:21:04,281 INFO replication.DownloadAndDiscardReplicator
(DownloadAndDiscardReplicator.java:replicate(62)) - Starting replication of
container 0 from [7369fd21-7ee9-4780-a54b-5831e951ca9c{ip: 127.0.0.1, host:
localhost, ports: [REPLICATION=41379], networkLocation: /default-rack,
certSerialId: null, persistedOpState: null, persistedOpStateExpiryEpochSec: 0}]
2021-05-04 16:21:04,434 INFO replication.GrpcReplicationService
(GrpcReplicationService.java:download(52)) - Streaming container data (0) to
other datanode
2021-05-04 16:21:05,269 INFO freon.ProgressBar
(ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
2021-05-04 16:21:06,270 INFO freon.ProgressBar
...
2021-05-04 16:21:10,275 INFO freon.ProgressBar
(ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
2021-05-04 16:21:11,275 INFO freon.ProgressBar
(ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
2021-05-04 16:21:11,791 INFO replication.GrpcOutputStream
(GrpcOutputStream.java:close(104)) - Sent 5368709120 bytes for container 0
...
2021-05-04 16:21:33,434 INFO freon.ProgressBar
(ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
2021-05-04 16:21:33,737 INFO replication.GrpcReplicationClient
(GrpcReplicationClient.java:onCompleted(190)) - Container 0 is downloaded to
/tmp/container-copy/container-0.tar.gz
2021-05-04 16:21:34,434 INFO freon.ProgressBar
(ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
2021-05-04 16:21:34,690 INFO replication.DownloadAndDiscardReplicator
(DownloadAndDiscardReplicator.java:replicate(71)) - Container is downloaded but
deleted, as you wished /tmp/container-copy/container-0.tar.gz
{code}
As you can see the full 5G data is sent out at 16:21:11 (after 6 seconds), but
the data copy is finished only at 16:21:33 (22 more seconds).
Between the two time the majority of the container is kept in the GRPC/netty
buffers.
As an experiment we can make the grpc client slow (GrpcReplicationClient):
{code}
@Override
public void onNext(CopyContainerResponseProto chunk) {
try {
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
chunk.getData().writeTo(stream);
} catch (IOException e) {
response.completeExceptionally(e);
}
}
{code}
With this method we download the beginning of the container very slowly, and
this is enough to get the exception above.
{code}
Exception in thread "grpc-default-executor-0"
org.apache.ratis.thirdparty.io.netty.util.internal.OutOfDirectMemoryError:
failed to allocate 2097152 byte(s) of direct memory (used: 3651141911, max:
3652190208)
{code}
Temporary it can be fixed with increasing the netty memory:
-Dorg.apache.ratis.thirdparty.io.netty.maxDirectMemory=16000000000 but it's
not a good long-term solution.
So we need to refactor the protocol to do a request/response chunk by chunk.
But we also have another problem. GRPC is not optimal for fast streaming.
The previous log showed that we replicated the container (5G) under 30 seconds
(without reading the original container and without doing tar file compression).
This is 5 / 30 = 170 Mb / sec. (I wrote to a tmpfs on the destination side, but
even my nvme is significant faster).
Based on
[this|https://blog.reverberate.org/2021/04/21/musttail-efficient-interpreters.html]
article the best (!) results (with C client!) were 1 Gb/s with GRPC. (with the
explained black magic it is doubled).
Ansh Khanna earlier did some low-level benchmarking (for ratis streaming) which
showed 5x difference between pure netty and GRPC:
Flatbuffers over GRPC
%CPU in Buffer Copying/Allocations: >10%
Time(in seconds): 16.44
Protobuffers over GRPC:
%CPU in Buffer Copying/Allocations: ~10%
Time(in seconds): 11.66
Netty Based Streaming
%CPU in Buffer Copying/Allocations: 0%
Time(in seconds): 2.7
Pure netty also supports zero copy async stream.
Summary:
1. The current implementation should be refactored to avoid pushing the data
2. Netty seems to be better for long-term solution
--> As a results it seems to be easier to create a POC with netty support and
check how does it work.
Earlier I made an attempt which can be found here:
https://github.com/elek/ozone/tree/close-container-replication-refactor
It's a generic interface which may also be used in
https://issues.apache.org/jira/browse/HDDS-5142 But at least it can be used to
compare the netty vs GRPC performance in this situation.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]