gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network]
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r386105709
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
##########
@@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate
inputGate, NetworkBufferPool
/**
* Returns a deserialized buffer message as it would be received during
runtime.
*/
- private static BufferResponse createBufferResponse(
+ private BufferResponse createBufferResponse(
Buffer buffer,
int sequenceNumber,
- InputChannelID receivingChannelId,
- int backlog) throws IOException {
+ RemoteInputChannel receivingChannel,
+ int backlog,
+ CreditBasedPartitionRequestClientHandler clientHandler)
throws IOException {
+
// Mock buffer to serialize
- BufferResponse resp = new BufferResponse(buffer,
sequenceNumber, receivingChannelId, backlog);
+ BufferResponse resp = new BufferResponse(
+ buffer,
+ sequenceNumber,
+ receivingChannel.getInputChannelId(),
+ backlog);
ByteBuf serialized =
resp.write(UnpooledByteBufAllocator.DEFAULT);
// Skip general header bytes
serialized.readBytes(NettyMessage.FRAME_HEADER_LENGTH);
+
// Deserialize the bytes again. We have to go this way, because
we only partly deserialize
// the header of the response and wait for a buffer from the
buffer pool to copy the payload
// data into.
- BufferResponse deserialized =
BufferResponse.readFrom(serialized);
+ NetworkBufferAllocator allocator = new
NetworkBufferAllocator(clientHandler);
Review comment:
Changed the parameter to allocator.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services