Hi Shixiong:

Thanks for the reply. You are right. It seems it only supports the
following two types.

I will retry by adding FileRegion type.

protected long calculateSize(Object msg) {
    if (msg instanceof ByteBuf) {
        return ((ByteBuf) msg).readableBytes();
    }
    if (msg instanceof ByteBufHolder) {
        return ((ByteBufHolder) msg).content().readableBytes();
    }
    return -1;
}


2017-06-14 1:34 GMT+08:00 Shixiong(Ryan) Zhu <shixi...@databricks.com>:

> I took a look at ChannelTrafficShapingHandler. Looks like it's because it
> doesn't support FileRegion. Spark's messages use this interface.
> See org.apache.spark.network.protocol.MessageWithHeader.
>
> On Tue, Jun 13, 2017 at 4:17 AM, Niu Zhaojie <nzjem...@gmail.com> wrote:
>
>> Hi All:
>>
>> I am trying to control the network read/write speed with
>> ChannelTrafficShapingHandler provided by Netty.
>>
>>
>> In TransportContext.java
>>
>> I modify it as below:
>>
>> public TransportChannelHandler initializePipeline(
>>         SocketChannel channel,
>>         RpcHandler channelRpcHandler) {
>>   try {
>>     // added by zhaojie
>>     logger.info("want to try control read bandwidth on host: " + host);
>>     final ChannelTrafficShapingHandler channelShaping = new 
>> ChannelTrafficShapingHandler(50, 50, 1000);
>>
>>     TransportChannelHandler channelHandler = createChannelHandler(channel, 
>> channelRpcHandler);
>>
>>     channel.pipeline()
>>             .addLast("encoder", ENCODER)
>>             .addLast(TransportFrameDecoder.HANDLER_NAME, 
>> NettyUtils.createFrameDecoder())
>>             .addLast("decoder", DECODER)
>>             .addLast("channelTrafficShaping", channelShaping)
>>             .addLast("idleStateHandler", new IdleStateHandler(0, 0, 
>> conf.connectionTimeoutMs() / 1000))
>>             // NOTE: Chunks are currently guaranteed to be returned in the 
>> order of request, but this
>>             // would require more logic to guarantee if this were not part 
>> of the same event loop.
>>             .addLast("handler", channelHandler);
>>
>>
>> I create a ChannelTrafficShapingHandler and register it into the
>> pipeline of the channel. I set the write and read speed as 50kb/sec in the
>> constructor.
>> Except for it, what else do I need to do?
>>
>> However, it does not work. Is this idea correct? Am I missing something?
>>
>> Is there any better way ?
>>
>> Thanks.
>>
>> --
>> *Regards,*
>> *Zhaojie*
>>
>>
>


-- 
*Regards,*
*Zhaojie*

Reply via email to