This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 1c780f163 feat(csharp/src/Drivers/Apache): Performance improvement -
Replace TSocketTransport with TBufferedTransport (#2742)
1c780f163 is described below
commit 1c780f163f55adf2ad5e84661fe37ac5fc83dc21
Author: Sudhir Reddy Emmadi <[email protected]>
AuthorDate: Thu Apr 24 21:11:06 2025 +0530
feat(csharp/src/Drivers/Apache): Performance improvement - Replace
TSocketTransport with TBufferedTransport (#2742)
Co-authored-by: Sudhir Emmadi <[email protected]>
---
.../Apache/Hive2/HiveServer2HttpConnection.cs | 3 +-
.../Drivers/Apache/Impala/ImpalaHttpConnection.cs | 3 +-
.../Apache/Impala/ImpalaStandardConnection.cs | 6 +-
.../Drivers/Apache/Spark/SparkHttpConnection.cs | 3 +-
.../Apache/Spark/SparkStandardConnection.cs | 3 +-
.../Drivers/Apache/Thrift/IPeekableTransport.cs | 27 ---------
.../Thrift/Service/Rpc/Thrift/TBinaryColumn.cs | 4 +-
.../Thrift/Service/Rpc/Thrift/TBoolColumn.cs | 4 +-
.../Thrift/Service/Rpc/Thrift/TByteColumn.cs | 4 +-
.../Thrift/Service/Rpc/Thrift/TDoubleColumn.cs | 4 +-
.../Apache/Thrift/Service/Rpc/Thrift/TI16Column.cs | 4 +-
.../Apache/Thrift/Service/Rpc/Thrift/TI32Column.cs | 4 +-
.../Apache/Thrift/Service/Rpc/Thrift/TI64Column.cs | 4 +-
.../Thrift/Service/Rpc/Thrift/TStringColumn.cs | 4 +-
.../src/Drivers/Apache/Thrift/StreamExtensions.cs | 34 ++---------
.../Drivers/Apache/Thrift/ThriftSocketTransport.cs | 69 ----------------------
16 files changed, 24 insertions(+), 156 deletions(-)
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
index 1c882b78c..0bffe8714 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
@@ -29,6 +29,7 @@ using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
+using Thrift.Transport.Client;
namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
@@ -178,7 +179,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
httpClient.DefaultRequestHeaders.ExpectContinue = false;
TConfiguration config = new();
- ThriftHttpTransport transport = new(httpClient, config)
+ THttpTransport transport = new(httpClient, config)
{
// This value can only be set before the first call/request.
So if a new value for query timeout
// is set, we won't be able to update the value. Setting to
~infinite and relying on cancellation token
diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
b/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
index 157692c2a..ebc92e663 100644
--- a/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
@@ -30,6 +30,7 @@ using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
+using Thrift.Transport.Client;
namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
{
@@ -151,7 +152,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
httpClient.DefaultRequestHeaders.ExpectContinue = false;
TConfiguration config = new();
- ThriftHttpTransport transport = new(httpClient, config)
+ THttpTransport transport = new(httpClient, config)
{
// This value can only be set before the first call/request.
So if a new value for query timeout
// is set, we won't be able to update the value. Setting to
~infinite and relying on cancellation token
diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
b/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
index 623335e91..f5c84e2cd 100644
--- a/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
+++ b/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
@@ -25,6 +25,7 @@ using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
using Thrift.Transport;
+using Thrift.Transport.Client;
namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
{
@@ -105,8 +106,9 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
// Delay the open connection until later.
bool connectClient = false;
- ThriftSocketTransport transport = new(hostName!, int.Parse(port!),
connectClient, config: new());
- return transport;
+ TSocketTransport transport = new(hostName!, int.Parse(port!),
connectClient, config: new());
+ TBufferedTransport bufferedTransport = new
TBufferedTransport(transport);
+ return bufferedTransport;
}
protected override async Task<TProtocol>
CreateProtocolAsync(TTransport transport, CancellationToken cancellationToken =
default)
diff --git a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
index f3ad901bd..f17782ebc 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
@@ -30,6 +30,7 @@ using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
+using Thrift.Transport.Client;
namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
@@ -174,7 +175,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
httpClient.DefaultRequestHeaders.ExpectContinue = false;
TConfiguration config = new();
- ThriftHttpTransport transport = new(httpClient, config)
+ THttpTransport transport = new(httpClient, config)
{
// This value can only be set before the first call/request.
So if a new value for query timeout
// is set, we won't be able to update the value. Setting to
~infinite and relying on cancellation token
diff --git a/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
b/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
index 2c28ea8e1..12abfba4b 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
@@ -23,6 +23,7 @@ using System.Threading.Tasks;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
using Thrift.Transport;
+using Thrift.Transport.Client;
namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
@@ -97,7 +98,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
// Delay the open connection until later.
bool connectClient = false;
- ThriftSocketTransport transport = new(hostName!, int.Parse(port!),
connectClient, config: new());
+ TSocketTransport transport = new(hostName!, int.Parse(port!),
connectClient, config: new());
return transport;
}
diff --git a/csharp/src/Drivers/Apache/Thrift/IPeekableTransport.cs
b/csharp/src/Drivers/Apache/Thrift/IPeekableTransport.cs
deleted file mode 100644
index ddceac1a3..000000000
--- a/csharp/src/Drivers/Apache/Thrift/IPeekableTransport.cs
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-using System.IO;
-
-namespace Apache.Arrow.Adbc.Drivers.Apache
-{
- internal interface IPeekableTransport
- {
- Stream Input { get; }
- Stream Output { get; }
- }
-}
diff --git
a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBinaryColumn.cs
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBinaryColumn.cs
index aacec569f..153dc09d6 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBinaryColumn.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBinaryColumn.cs
@@ -9,7 +9,6 @@ using System.IO;
using System.Text;
using System.Threading;
using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Apache.Arrow.Types;
using Thrift.Collections;
@@ -55,7 +54,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
ArrowBuffer.Builder<byte> values = null;
byte[] nulls = null;
byte[] offsetBuffer = null;
- Stream transport = ((IPeekableTransport)iprot.Transport).Input;
int length = -1;
byte[] preAllocatedBuffer = new byte[65536];
@@ -100,7 +98,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
tmp = new byte[size];
}
- await transport.ReadExactlyAsync(tmp.AsMemory(0, size),
cancellationToken);
+ await iprot.Transport.ReadExactlyAsync(tmp.AsMemory(0,
size), cancellationToken);
values.Append(tmp.AsMemory(0, size).Span);
}
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span,
length * sizeof(int));
diff --git a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBoolColumn.cs
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBoolColumn.cs
index fed4ece10..46745af84 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBoolColumn.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBoolColumn.cs
@@ -9,7 +9,6 @@ using System.IO;
using System.Text;
using System.Threading;
using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
@@ -53,7 +52,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
ArrowBuffer.BitmapBuilder values = null;
byte[] nulls = null;
- Stream transport = ((IPeekableTransport)iprot.Transport).Input;
int length = -1;
await iprot.ReadStructBeginAsync(cancellationToken);
@@ -77,7 +75,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
byte[] buffer = new byte[length];
var memory = buffer.AsMemory();
iprot.Transport.CheckReadBytesAvailable(buffer.Length);
- await transport.ReadExactlyAsync(memory, cancellationToken);
+ await iprot.Transport.ReadExactlyAsync(buffer.AsMemory(0,
length), cancellationToken);
values = new ArrowBuffer.BitmapBuilder(length);
for (int _i134 = 0; _i134 < length; ++_i134)
diff --git a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TByteColumn.cs
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TByteColumn.cs
index 9d21795e9..1a626aa6f 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TByteColumn.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TByteColumn.cs
@@ -11,7 +11,6 @@ using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
@@ -55,7 +54,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
byte[] nulls = null;
byte[] buffer = null;
- Stream transport = ((IPeekableTransport)iprot.Transport).Input;
int length = -1;
await iprot.ReadStructBeginAsync(cancellationToken);
@@ -79,7 +77,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
buffer = new byte[length];
var memory = buffer.AsMemory();
iprot.Transport.CheckReadBytesAvailable(buffer.Length);
- await transport.ReadExactlyAsync(memory, cancellationToken);
+ await iprot.Transport.ReadExactlyAsync(memory,
cancellationToken);
await iprot.ReadListEndAsync(cancellationToken);
}
diff --git
a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TDoubleColumn.cs
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TDoubleColumn.cs
index 5ba749148..a2c50a73d 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TDoubleColumn.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TDoubleColumn.cs
@@ -11,7 +11,6 @@ using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
@@ -53,7 +52,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
byte[] nulls = null;
byte[] buffer = null;
- Stream transport = ((IPeekableTransport)iprot.Transport).Input;
int length = -1;
await iprot.ReadStructBeginAsync(cancellationToken);
@@ -77,7 +75,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
buffer = new byte[length * sizeof(double)];
var memory = buffer.AsMemory();
iprot.Transport.CheckReadBytesAvailable(buffer.Length);
- await transport.ReadExactlyAsync(memory, cancellationToken);
+ await iprot.Transport.ReadExactlyAsync(memory,
cancellationToken);
for (int _i179 = 0; _i179 < length; ++_i179)
{
StreamExtensions.ReverseEndianI64AtOffset(memory.Span,
_i179 * sizeof(double));
diff --git a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI16Column.cs
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI16Column.cs
index 715f8397e..3d7c8c480 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI16Column.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI16Column.cs
@@ -11,7 +11,6 @@ using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
@@ -55,7 +54,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
byte[] nulls = null;
byte[] buffer = null;
- Stream transport = ((IPeekableTransport)iprot.Transport).Input;
int length = -1;
await iprot.ReadStructBeginAsync(cancellationToken);
@@ -79,7 +77,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
buffer = new byte[length * sizeof(short)];
var memory = buffer.AsMemory();
iprot.Transport.CheckReadBytesAvailable(buffer.Length);
- await transport.ReadExactlyAsync(memory, cancellationToken);
+ await iprot.Transport.ReadExactlyAsync(memory,
cancellationToken);
for (int _i152 = 0; _i152 < length; ++_i152)
{
StreamExtensions.ReverseEndiannessInt16(memory.Span, _i152
* sizeof(short));
diff --git a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI32Column.cs
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI32Column.cs
index dba3a316d..d4543318f 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI32Column.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI32Column.cs
@@ -11,7 +11,6 @@ using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
@@ -55,7 +54,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
byte[] nulls = null;
byte[] buffer = null;
- Stream transport = ((IPeekableTransport)iprot.Transport).Input;
int length = -1;
await iprot.ReadStructBeginAsync(cancellationToken);
@@ -78,7 +76,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
buffer = new byte[length * sizeof(int)];
var memory = buffer.AsMemory();
iprot.Transport.CheckReadBytesAvailable(buffer.Length);
- await transport.ReadExactlyAsync(memory, cancellationToken);
+ await iprot.Transport.ReadExactlyAsync(memory,
cancellationToken);
for (int _i161 = 0; _i161 < length; ++_i161)
{
StreamExtensions.ReverseEndianI32AtOffset(memory.Span,
_i161 * sizeof(int));
diff --git a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI64Column.cs
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI64Column.cs
index 9401e9035..77b872959 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI64Column.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI64Column.cs
@@ -11,7 +11,6 @@ using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Protocol;
using Thrift.Protocol.Entities;
@@ -54,7 +53,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
byte[] nulls = null;
byte[] buffer = null;
- Stream transport = ((IPeekableTransport)iprot.Transport).Input;
int length = -1;
await iprot.ReadStructBeginAsync(cancellationToken);
@@ -78,7 +76,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
buffer = new byte[length * sizeof(long)];
var memory = buffer.AsMemory();
iprot.Transport.CheckReadBytesAvailable(buffer.Length);
- await transport.ReadExactlyAsync(memory, cancellationToken);
+ await iprot.Transport.ReadExactlyAsync(memory,
cancellationToken);
for (int _i170 = 0; _i170 < length; ++_i170)
{
StreamExtensions.ReverseEndianI64AtOffset(memory.Span,
_i170 * sizeof(long));
diff --git
a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TStringColumn.cs
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TStringColumn.cs
index fdbc3eea8..24a3e07f0 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TStringColumn.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TStringColumn.cs
@@ -10,7 +10,6 @@ using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Thrift.Collections;
using Thrift.Protocol;
@@ -55,7 +54,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
ArrowBuffer.Builder<byte> values = null;
byte[] nulls = null;
byte[] offsetBuffer = null;
- Stream transport = ((IPeekableTransport)iprot.Transport).Input;
int length = -1;
byte[] preAllocatedBuffer = new byte[65536];
@@ -101,7 +99,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
tmp = new byte[size];
}
- await transport.ReadExactlyAsync(tmp.AsMemory(0, size),
cancellationToken);
+ await iprot.Transport.ReadExactlyAsync(tmp.AsMemory(0,
size), cancellationToken);
values.Append(tmp.AsMemory(0, size).Span);
}
StreamExtensions.WriteInt32LittleEndian(offset, memory.Span,
length * 4);
diff --git a/csharp/src/Drivers/Apache/Thrift/StreamExtensions.cs
b/csharp/src/Drivers/Apache/Thrift/StreamExtensions.cs
index 6563b7ea0..ec583f145 100644
--- a/csharp/src/Drivers/Apache/Thrift/StreamExtensions.cs
+++ b/csharp/src/Drivers/Apache/Thrift/StreamExtensions.cs
@@ -21,6 +21,7 @@ using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
+using Thrift.Transport;
namespace Apache.Arrow.Adbc.Drivers.Apache.Thrift
{
@@ -100,9 +101,9 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Thrift
return dictionary.TryGetValue(key, out TValue? value) ? value :
defaultValue;
}
- public static async Task<bool> ReadExactlyAsync(this Stream stream,
Memory<byte> memory, CancellationToken cancellationToken = default)
+ public static async Task<bool> ReadExactlyAsync(this TTransport
transport, Memory<byte> memory, CancellationToken cancellationToken = default)
{
- if (stream == null) throw new
ArgumentNullException(nameof(stream));
+ if (transport == null) throw new
ArgumentNullException(nameof(transport));
// Try to get the underlying array from the Memory<byte>
if (!MemoryMarshal.TryGetArray(memory, out ArraySegment<byte>
arraySegment))
@@ -115,7 +116,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Thrift
while (totalBytesRead < count)
{
- int bytesRead = await stream.ReadAsync(arraySegment.Array!,
arraySegment.Offset + totalBytesRead, count - totalBytesRead,
cancellationToken).ConfigureAwait(false);
+ int bytesRead = await transport.ReadAsync(arraySegment.Array!,
arraySegment.Offset + totalBytesRead, count - totalBytesRead,
cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
@@ -128,32 +129,5 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Thrift
return true;
}
-
- public static async Task<bool> ReadExactlyAsync(this Stream stream,
byte[] buffer, int offset, int count, CancellationToken cancellationToken =
default)
- {
- if (stream == null) throw new
ArgumentNullException(nameof(stream));
- if (buffer == null) throw new
ArgumentNullException(nameof(buffer));
- if (offset < 0 || offset >= buffer.Length) throw new
ArgumentOutOfRangeException(nameof(offset));
- if (count < 0 || (count + offset) > buffer.Length) throw new
ArgumentOutOfRangeException(nameof(count));
-
- int bytesReadTotal = 0;
-
- while (bytesReadTotal < count)
- {
- int bytesRead = await stream.ReadAsync(buffer, offset +
bytesReadTotal, count - bytesReadTotal,
cancellationToken).ConfigureAwait(false);
-
- // If ReadAsync returns 0, it means the end of the stream has
been reached
- if (bytesRead == 0)
- {
- // If we haven't read any bytes at all, it's okay (might
be at the end of the stream)
- // But if we've read some bytes and then hit the end of
the stream, it's unexpected
- return bytesReadTotal == 0;
- }
-
- bytesReadTotal += bytesRead;
- }
-
- return true;
- }
}
}
diff --git a/csharp/src/Drivers/Apache/Thrift/ThriftSocketTransport.cs
b/csharp/src/Drivers/Apache/Thrift/ThriftSocketTransport.cs
deleted file mode 100644
index c9336ff37..000000000
--- a/csharp/src/Drivers/Apache/Thrift/ThriftSocketTransport.cs
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-using System.IO;
-using System.Net.Http;
-using System.Reflection;
-using Thrift;
-using Thrift.Transport.Client;
-
-namespace Apache.Arrow.Adbc.Drivers.Apache
-{
- class ThriftSocketTransport : TSocketTransport, IPeekableTransport
- {
- public ThriftSocketTransport(string host, int port, TConfiguration
config, int timeout = 0)
- : base(host, port, config, timeout)
- {
- }
-
- public ThriftSocketTransport(string hostNameOrIpAddress, int port,
bool connectClient, TConfiguration config, int timeout = 0)
- : base(hostNameOrIpAddress, port, connectClient, config, timeout)
- {
- }
-
- public Stream Input { get { return this.InputStream; } }
- public Stream Output { get { return this.OutputStream; } }
- }
-
- // TODO: Experimental
- class ThriftHttpTransport : THttpTransport, IPeekableTransport
- {
- public ThriftHttpTransport(HttpClient httpClient, TConfiguration
config)
- : base(httpClient, config)
- {
-
- }
-
- public Stream Input
- {
- get
- {
- // not advocating for this, but it works
- Stream stream =
(Stream)((FieldInfo[])((TypeInfo)this.GetType().BaseType!).DeclaredFields)[4].GetValue(this)!;
- return stream;
- }
- }
- public Stream Output
- {
- get
- {
- Stream stream =
(Stream)this.GetType().BaseType!.GetField("_outputStream",
BindingFlags.NonPublic)!.GetValue(this)!;
- return stream;
- }
- }
- }
-}