[
https://issues.apache.org/jira/browse/THRIFT-5628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605158#comment-17605158
]
Philip Lee commented on THRIFT-5628:
------------------------------------
The application polls some hardware collecting data for an extended period.
The generate interface is
{code:java}
public partial class SystemControl
{
public interface IAsync
...
{code}
A single instance of SystemControl is created like so, where host is an ip
address string. This code used to be simpler (2nd part of if statement) but
this failed when there was no DNS on the network (see Thrift-5610).
{code:java}
public static SystemControl.IAsync DefaultFactoryAsync(string host, int
port)
{
if (System.Net.IPAddress.TryParse(host, out var address))
{
// Create a TcpClient
var client = new TcpClient(address.ToString(), port)
{
// duplicate setting used in other TSocketTransport constructors
NoDelay = true
};
var socket = new TSocketTransport(client, null);
// using 'new TConfiguration { MaxMessageSize = int.MaxValue }' as a work
around for now
var transport = new TBufferedTransport(socket);
var protocol = new TBinaryProtocol(transport);
return new SystemControl.Client(protocol);
}
else
{
// TSocketTransport('hostname/ip-address', portnumber, ...) calls
Dns.GetHostEntry(host)
// which fails for the internal network of the instrument.
var socket = new TSocketTransport(host, port, null);
var transport = new TBufferedTransport(socket);
var protocol = new TBinaryProtocol(transport);
return new SystemControl.Client(protocol);
}
}
{code}
The hardware is then polled with code similar to
{code:java}
_task = Task.Run(async () => {
var result = await _client.get_next_waveform(sessionId, timeout,
cancellationToken);
await callback.Invoke(result);
}
{code}
Where the Thrift compiler has generated this
{code:java}
public async
global::System.Threading.Tasks.Task<global::TeraView.TeraPulse.Rpc.Thrift.Waveform>
get_next_waveform(string session, double timeout, CancellationToken
cancellationToken = default)
{
await send_get_next_waveform(session, timeout, cancellationToken);
return await recv_get_next_waveform(cancellationToken);
}
public async global::System.Threading.Tasks.Task
send_get_next_waveform(string session, double timeout, CancellationToken
cancellationToken = default)
{
await OutputProtocol.WriteMessageBeginAsync(new
TMessage("get_next_waveform", TMessageType.Call, SeqId), cancellationToken);
var tmp194 = new InternalStructs.get_next_waveform_args() {
Session = session,
Timeout = timeout,
};
await tmp194.WriteAsync(OutputProtocol, cancellationToken);
await OutputProtocol.WriteMessageEndAsync(cancellationToken);
await OutputProtocol.Transport.FlushAsync(cancellationToken);
}
public async
global::System.Threading.Tasks.Task<global::TeraView.TeraPulse.Rpc.Thrift.Waveform>
recv_get_next_waveform(CancellationToken cancellationToken = default)
{
var tmp195 = await
InputProtocol.ReadMessageBeginAsync(cancellationToken);
if (tmp195.Type == TMessageType.Exception)
{
var tmp196 = await TApplicationException.ReadAsync(InputProtocol,
cancellationToken);
await InputProtocol.ReadMessageEndAsync(cancellationToken);
throw tmp196;
}
var tmp197 = new InternalStructs.get_next_waveform_result();
await tmp197.ReadAsync(InputProtocol, cancellationToken);
await InputProtocol.ReadMessageEndAsync(cancellationToken);
if (tmp197.__isset.success)
{
return tmp197.Success;
}
if (tmp197.__isset.exc)
{
throw tmp197.Exc;
}
throw new
TApplicationException(TApplicationException.ExceptionType.MissingResult,
"get_next_waveform failed: unknown result");
}
{code}
I'm thinking that ReadMessageEndAsync should reset MaxMessageSize.
> MaxMessageSize is never reset on a read buffer
> ----------------------------------------------
>
> Key: THRIFT-5628
> URL: https://issues.apache.org/jira/browse/THRIFT-5628
> Project: Thrift
> Issue Type: Bug
> Components: netstd - Library
> Affects Versions: 0.16.0
> Reporter: Philip Lee
> Priority: Major
>
> It appears that for the ReadBuffer of a TMemoryBufferTransport the method
> CountConsumedMessageBytes() is called, but ResetConsumedMessageSize() is
> never called.
> Our code as a long lived client which is polling periodically for an extended
> time. RemainingMessageSize eventually falls to <= 0 and a
> TTransportException("MaxMessageSize reached") is then thrown.
> Is this a bug or expected?
> I can fix this by changing TMemoryBufferTransport as follows
> {code:java}
> public override ValueTask<int> ReadAsync(byte[] buffer, int offset, int
> length, CancellationToken cancellationToken)
> {
> var count = Math.Min(Length - Position, length);
> Buffer.BlockCopy(Bytes, Position, buffer, offset, count);
> Position += count;
> CountConsumedMessageBytes(count);
> ---> ResetConsumedMessageSize();
> return new ValueTask<int>(count);
> }{code}
> but not confident this is correct.
> Or as a work around I can set TConfiguration.MaxMessageSize = int.MaxValue
> which will allow our code to operate for longer (20x) before failing.
> Or I can recreate the client periodically.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)