[
https://issues.apache.org/jira/browse/RATIS-1868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tsz-wo Sze updated RATIS-1868:
------------------------------
Fix Version/s: 2.5.2
> Handling Netty back pressure when streaming ratis log
> -----------------------------------------------------
>
> Key: RATIS-1868
> URL: https://issues.apache.org/jira/browse/RATIS-1868
> Project: Ratis
> Issue Type: Improvement
> Components: gRPC
> Reporter: Duong
> Assignee: Duong
> Priority: Major
> Fix For: 3.0.0, 2.5.2
>
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> GrpcLogAppender sends messages faster than the network/receiver can consume.
> This results in many messages being queued by Netty and the amount allocated
> memory surges and results in OOME.
> {code:java}
> 2023-08-04 04:31:26,453 ERROR
> org.apache.ratis.server.leader.LogAppenderDaemon:
> 763623f9-79b1-4ed0-b957-3ab3938f4d26@group-19B128BEA0AE->b0b1160a-8529-45c3-a9bc-98153af9be66-GrpcLogAppender-LogAppenderDaemon
> failed
> org.apache.ratis.thirdparty.io.netty.util.internal.OutOfDirectMemoryError:
> failed to allocate 2097152 byte(s) of direct memory (used: 66108522775, max:
> 66109636608)
> at
> org.apache.ratis.thirdparty.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:845)
> at
> org.apache.ratis.thirdparty.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:774)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:701)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:676)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:197)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocate(PoolArena.java:139)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:396)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
> at
> org.apache.ratis.thirdparty.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:124)
> at
> org.apache.ratis.thirdparty.io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeRaw(MessageFramer.java:289)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.access$400(MessageFramer.java:44)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer$OutputStreamAdapter.write(MessageFramer.java:379)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream$OutputStreamEncoder.write(CodedOutputStream.java:3005)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream$OutputStreamEncoder.writeLazy(CodedOutputStream.java:3013)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.ByteString$LiteralByteString.writeTo(ByteString.java:1411)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.RopeByteString.writeTo(RopeByteString.java:461)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.RopeByteString.writeTo(RopeByteString.java:461)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream$OutputStreamEncoder.writeBytesNoTag(CodedOutputStream.java:2801)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream$OutputStreamEncoder.writeBytes(CodedOutputStream.java:2775)
> at
> org.apache.ratis.proto.RaftProtos$StateMachineEntryProto.writeTo(RaftProtos.java:6823)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream$OutputStreamEncoder.writeMessageNoTag(CodedOutputStream.java:2855)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream$OutputStreamEncoder.writeMessage(CodedOutputStream.java:2824)
> at
> org.apache.ratis.proto.RaftProtos$StateMachineLogEntryProto.writeTo(RaftProtos.java:7660)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream$OutputStreamEncoder.writeMessageNoTag(CodedOutputStream.java:2855)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream$OutputStreamEncoder.writeMessage(CodedOutputStream.java:2824)
> at
> org.apache.ratis.proto.RaftProtos$LogEntryProto.writeTo(RaftProtos.java:9230)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream$OutputStreamEncoder.writeMessageNoTag(CodedOutputStream.java:2855)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream$OutputStreamEncoder.writeMessage(CodedOutputStream.java:2824)
> at
> org.apache.ratis.proto.RaftProtos$AppendEntriesRequestProto.writeTo(RaftProtos.java:17289)
> at
> org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessageLite.writeTo(AbstractMessageLite.java:83)
> at
> org.apache.ratis.thirdparty.io.grpc.protobuf.lite.ProtoInputStream.drainTo(ProtoInputStream.java:52)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeToOutputStream(MessageFramer.java:271)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:233)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:169)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:142)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:65)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.ForwardingClientStream.writeMessage(ForwardingClientStream.java:37)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.ClientCallImpl.sendMessageInternal(ClientCallImpl.java:523)
> at
> org.apache.ratis.thirdparty.io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:507)
> at
> org.apache.ratis.thirdparty.io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:374)
> at
> org.apache.ratis.grpc.server.GrpcLogAppender$StreamObservers.onNext(GrpcLogAppender.java:258)
> at
> org.apache.ratis.grpc.server.GrpcLogAppender.lambda$sendRequest$1(GrpcLogAppender.java:309)
> at java.util.Optional.map(Optional.java:215)
> at
> org.apache.ratis.grpc.server.GrpcLogAppender.sendRequest(GrpcLogAppender.java:308)
> at
> org.apache.ratis.grpc.server.GrpcLogAppender.appendLog(GrpcLogAppender.java:298)
> at
> org.apache.ratis.grpc.server.GrpcLogAppender.run(GrpcLogAppender.java:175)
> at
> org.apache.ratis.server.leader.LogAppenderDaemon.run(LogAppenderDaemon.java:78)
> at java.lang.Thread.run(Thread.java:750) {code}
> We should apply manual flow control to observe the stream readiness to send
> requests accordingly. This will minimize the number of queue messages. Ref:
> [https://github.com/grpc/grpc-java/tree/v1.30.2/examples/src/main/java/io/grpc/examples/manualflowcontrol]
>
> Similar issues is discussed at https://github.com/grpc/grpc-java/issues/2710.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)