This is an automated email from the ASF dual-hosted git repository. jensg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/thrift.git
The following commit(s) were added to refs/heads/master by this push: new bd1a273 THRIFT-4898 Pipe write operations across a network are limited to 65,535 bytes per write. Client: netstd Patch: Jens Geyer bd1a273 is described below commit bd1a273ab7979824952bab906b8e260f81b2bd15 Author: Jens Geyer <je...@apache.org> AuthorDate: Wed Jun 26 22:52:44 2019 +0200 THRIFT-4898 Pipe write operations across a network are limited to 65,535 bytes per write. Client: netstd Patch: Jens Geyer This closes #1823 --- lib/delphi/test/TestClient.pas | 10 +- lib/delphi/test/TestServer.pas | 2 +- .../Thrift/Transport/Client/TNamedPipeTransport.cs | 35 ++++--- .../Transport/Server/TNamedPipeServerTransport.cs | 28 ++++-- test/netstd/Client/TestClient.cs | 112 +++++++++++++-------- test/netstd/Server/TestServer.cs | 3 +- 6 files changed, 119 insertions(+), 71 deletions(-) diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas index c2660a2..e59c327 100644 --- a/lib/delphi/test/TestClient.pas +++ b/lib/delphi/test/TestClient.pas @@ -92,7 +92,8 @@ type Empty, // Edge case: the zero-length empty binary Normal, // Fairly small array of usual size (256 bytes) ByteArrayTest, // THRIFT-4454 Large writes/reads may cause range check errors in debug mode - PipeWriteLimit // THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write. + PipeWriteLimit, // THRIFT-4372 Pipe write operations across a network are limited to 65,535 bytes per write. + TwentyMB // that's quite a bit of data ); private @@ -537,12 +538,12 @@ begin // random binary small for testsize := Low(TTestSize) to High(TTestSize) do begin binOut := PrepareBinaryData( TRUE, testsize); - Console.WriteLine('testBinary('+BytesToHex(binOut)+')'); + Console.WriteLine('testBinary('+IntToStr(Length(binOut))+' bytes)'); try binIn := client.testBinary(binOut); - Expect( Length(binOut) = Length(binIn), 'testBinary(): length '+IntToStr(Length(binOut))+' = '+IntToStr(Length(binIn))); + Expect( Length(binOut) = Length(binIn), 'testBinary('+IntToStr(Length(binOut))+' bytes): '+IntToStr(Length(binIn))+' bytes received'); i32 := Min( Length(binOut), Length(binIn)); - Expect( CompareMem( binOut, binIn, i32), 'testBinary('+BytesToHex(binOut)+') = '+BytesToHex(binIn)); + Expect( CompareMem( binOut, binIn, i32), 'testBinary('+IntToStr(Length(binOut))+' bytes): validating received data'); except on e:TApplicationException do Console.WriteLine('testBinary(): '+e.Message); on e:Exception do Expect( FALSE, 'testBinary(): Unexpected exception "'+e.ClassName+'": '+e.Message); @@ -1023,6 +1024,7 @@ begin Normal : SetLength( result, $100); ByteArrayTest : SetLength( result, SizeOf(TByteArray) + 128); PipeWriteLimit : SetLength( result, 65535 + 128); + TwentyMB : SetLength( result, 20 * 1024 * 1024); else raise EArgumentException.Create('aSize'); end; diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas index 4cb0090..2a80d52 100644 --- a/lib/delphi/test/TestServer.pas +++ b/lib/delphi/test/TestServer.pas @@ -144,7 +144,7 @@ end; function TTestServer.TTestHandlerImpl.testBinary(const thing: TBytes): TBytes; begin - Console.WriteLine('testBinary("' + BytesToHex( thing ) + '")'); + Console.WriteLine('testBinary('+IntToStr(Length(thing)) + ' bytes)'); Result := thing; end; diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs index 2f96a6a..7dfe013 100644 --- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs +++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +using System; using System.IO.Pipes; using System.Threading; using System.Threading.Tasks; @@ -24,7 +25,7 @@ namespace Thrift.Transport.Client // ReSharper disable once InconsistentNaming public class TNamedPipeTransport : TTransport { - private NamedPipeClientStream _client; + private NamedPipeClientStream PipeStream; private int ConnectTimeout; public TNamedPipeTransport(string pipe, int timeout = Timeout.Infinite) @@ -37,10 +38,10 @@ namespace Thrift.Transport.Client var serverName = string.IsNullOrWhiteSpace(server) ? server : "."; ConnectTimeout = (timeout > 0) ? timeout : Timeout.Infinite; - _client = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None); + PipeStream = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None); } - public override bool IsOpen => _client != null && _client.IsConnected; + public override bool IsOpen => PipeStream != null && PipeStream.IsConnected; public override async Task OpenAsync(CancellationToken cancellationToken) { @@ -49,36 +50,46 @@ namespace Thrift.Transport.Client throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen); } - await _client.ConnectAsync( ConnectTimeout, cancellationToken); + await PipeStream.ConnectAsync( ConnectTimeout, cancellationToken); } public override void Close() { - if (_client != null) + if (PipeStream != null) { - _client.Dispose(); - _client = null; + PipeStream.Dispose(); + PipeStream = null; } } public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) { - if (_client == null) + if (PipeStream == null) { throw new TTransportException(TTransportException.ExceptionType.NotOpen); } - return await _client.ReadAsync(buffer, offset, length, cancellationToken); + return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken); } public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) { - if (_client == null) + if (PipeStream == null) { throw new TTransportException(TTransportException.ExceptionType.NotOpen); } - await _client.WriteAsync(buffer, offset, length, cancellationToken); + // if necessary, send the data in chunks + // there's a system limit around 0x10000 bytes that we hit otherwise + // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section." + var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit + while (nBytes > 0) + { + await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken); + offset += nBytes; + length -= nBytes; + nBytes = Math.Min(nBytes, length); + } } public override async Task FlushAsync(CancellationToken cancellationToken) @@ -91,7 +102,7 @@ namespace Thrift.Transport.Client protected override void Dispose(bool disposing) { - _client.Dispose(); + PipeStream.Dispose(); } } } diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs index 31a052a..77b8251 100644 --- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs +++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs @@ -239,14 +239,14 @@ namespace Thrift.Transport.Server private class ServerTransport : TTransport { - private readonly NamedPipeServerStream _stream; + private readonly NamedPipeServerStream PipeStream; public ServerTransport(NamedPipeServerStream stream) { - _stream = stream; + PipeStream = stream; } - public override bool IsOpen => _stream != null && _stream.IsConnected; + public override bool IsOpen => PipeStream != null && PipeStream.IsConnected; public override async Task OpenAsync(CancellationToken cancellationToken) { @@ -258,27 +258,37 @@ namespace Thrift.Transport.Server public override void Close() { - _stream?.Dispose(); + PipeStream?.Dispose(); } public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) { - if (_stream == null) + if (PipeStream == null) { throw new TTransportException(TTransportException.ExceptionType.NotOpen); } - return await _stream.ReadAsync(buffer, offset, length, cancellationToken); + return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken); } public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) { - if (_stream == null) + if (PipeStream == null) { throw new TTransportException(TTransportException.ExceptionType.NotOpen); } - await _stream.WriteAsync(buffer, offset, length, cancellationToken); + // if necessary, send the data in chunks + // there's a system limit around 0x10000 bytes that we hit otherwise + // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section." + var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit + while (nBytes > 0) + { + await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken); + offset += nBytes; + length -= nBytes; + nBytes = Math.Min(nBytes, length); + } } public override async Task FlushAsync(CancellationToken cancellationToken) @@ -291,7 +301,7 @@ namespace Thrift.Transport.Server protected override void Dispose(bool disposing) { - _stream?.Dispose(); + PipeStream?.Dispose(); } } } diff --git a/test/netstd/Client/TestClient.cs b/test/netstd/Client/TestClient.cs index 6be1023..0f58f95 100644 --- a/test/netstd/Client/TestClient.cs +++ b/test/netstd/Client/TestClient.cs @@ -73,7 +73,7 @@ namespace ThriftTest public ProtocolChoice protocol = ProtocolChoice.Binary; public TransportChoice transport = TransportChoice.Socket; - internal void Parse( List<string> args) + internal void Parse(List<string> args) { for (var i = 0; i < args.Count; ++i) { @@ -220,18 +220,18 @@ namespace ThriftTest { throw new FileNotFoundException($"Cannot find file: {clientCertName}"); } - + var cert = new X509Certificate2(existingPath, "thrift"); return cert; } - + public TTransport CreateTransport() { // endpoint transport TTransport trans = null; - switch(transport) + switch (transport) { case TransportChoice.Http: Debug.Assert(url != null); @@ -249,8 +249,8 @@ namespace ThriftTest { throw new InvalidOperationException("Certificate doesn't contain private key"); } - - trans = new TTlsSocketTransport(host, port, 0, cert, + + trans = new TTlsSocketTransport(host, port, 0, cert, (sender, certificate, chain, errors) => true, null, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12); break; @@ -263,7 +263,7 @@ namespace ThriftTest // layered transport - switch(layered) + switch (layered) { case LayeredChoice.Buffered: trans = new TBufferedTransport(trans); @@ -436,15 +436,46 @@ namespace ThriftTest return BitConverter.ToString(data).Replace("-", string.Empty); } - public static byte[] PrepareTestData(bool randomDist) + + public enum BinaryTestSize + { + Empty, // Edge case: the zero-length empty binary + Normal, // Fairly small array of usual size (256 bytes) + Large, // Large writes/reads may cause range check errors + PipeWriteLimit, // Windows Limit: Pipe write operations across a network are limited to 65,535 bytes per write. + TwentyMB // that's quite a bit of data + }; + + public static byte[] PrepareTestData(bool randomDist, BinaryTestSize testcase) { - var retval = new byte[0x100]; - var initLen = Math.Min(0x100, retval.Length); + int amount = -1; + switch (testcase) + { + case BinaryTestSize.Empty: + amount = 0; + break; + case BinaryTestSize.Normal: + amount = 0x100; + break; + case BinaryTestSize.Large: + amount = 0x8000 + 128; + break; + case BinaryTestSize.PipeWriteLimit: + amount = 0xFFFF + 128; + break; + case BinaryTestSize.TwentyMB: + amount = 20 * 1024 * 1024; + break; + default: + throw new ArgumentException(nameof(testcase)); + } + + var retval = new byte[amount]; // linear distribution, unless random is requested if (!randomDist) { - for (var i = 0; i < initLen; ++i) + for (var i = 0; i < retval.Length; ++i) { retval[i] = (byte)i; } @@ -452,22 +483,10 @@ namespace ThriftTest } // random distribution - for (var i = 0; i < initLen; ++i) - { - retval[i] = (byte)0; - } var rnd = new Random(); - for (var i = 1; i < initLen; ++i) + for (var i = 1; i < retval.Length; ++i) { - while (true) - { - var nextPos = rnd.Next() % initLen; - if (retval[nextPos] == 0) - { - retval[nextPos] = (byte)i; - break; - } - } + retval[i] = (byte)rnd.Next(0x100); } return retval; } @@ -557,32 +576,39 @@ namespace ThriftTest returnCode |= ErrorBaseTypes; } - var binOut = PrepareTestData(true); - Console.Write("testBinary(" + BytesToHex(binOut) + ")"); - try + // testBinary() + foreach(BinaryTestSize binTestCase in Enum.GetValues(typeof(BinaryTestSize))) { - var binIn = await client.testBinaryAsync(binOut, MakeTimeoutToken()); - Console.WriteLine(" = " + BytesToHex(binIn)); - if (binIn.Length != binOut.Length) + var binOut = PrepareTestData(true, binTestCase); + + Console.Write("testBinary({0} bytes)", binOut.Length); + try { - Console.WriteLine("*** FAILED ***"); - returnCode |= ErrorBaseTypes; - } - for (var ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs) - if (binIn[ofs] != binOut[ofs]) + var binIn = await client.testBinaryAsync(binOut, MakeTimeoutToken()); + Console.WriteLine(" = {0} bytes", binIn.Length); + if (binIn.Length != binOut.Length) { Console.WriteLine("*** FAILED ***"); returnCode |= ErrorBaseTypes; } - } - catch (Thrift.TApplicationException ex) - { - Console.WriteLine("*** FAILED ***"); - returnCode |= ErrorBaseTypes; - Console.WriteLine(ex.Message + " ST: " + ex.StackTrace); + for (var ofs = 0; ofs < Math.Min(binIn.Length, binOut.Length); ++ofs) + { + if (binIn[ofs] != binOut[ofs]) + { + Console.WriteLine("*** FAILED ***"); + returnCode |= ErrorBaseTypes; + } + } + } + catch (Thrift.TApplicationException ex) + { + Console.WriteLine("*** FAILED ***"); + returnCode |= ErrorBaseTypes; + Console.WriteLine(ex.Message + " ST: " + ex.StackTrace); + } } - // binary equals? + // CrazyNesting Console.WriteLine("Test CrazyNesting"); var one = new CrazyNesting(); var two = new CrazyNesting(); diff --git a/test/netstd/Server/TestServer.cs b/test/netstd/Server/TestServer.cs index 82b36eb..25c2afc 100644 --- a/test/netstd/Server/TestServer.cs +++ b/test/netstd/Server/TestServer.cs @@ -246,8 +246,7 @@ namespace ThriftTest public Task<byte[]> testBinaryAsync(byte[] thing, CancellationToken cancellationToken) { - var hex = BitConverter.ToString(thing).Replace("-", string.Empty); - logger.Invoke("testBinary({0:X})", hex); + logger.Invoke("testBinary({0} bytes)", thing.Length); return Task.FromResult(thing); }