This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c780f163 feat(csharp/src/Drivers/Apache): Performance improvement - 
Replace TSocketTransport with TBufferedTransport (#2742)
1c780f163 is described below

commit 1c780f163f55adf2ad5e84661fe37ac5fc83dc21
Author: Sudhir Reddy Emmadi <[email protected]>
AuthorDate: Thu Apr 24 21:11:06 2025 +0530

    feat(csharp/src/Drivers/Apache): Performance improvement - Replace 
TSocketTransport with TBufferedTransport (#2742)
    
    Co-authored-by: Sudhir Emmadi <[email protected]>
---
 .../Apache/Hive2/HiveServer2HttpConnection.cs      |  3 +-
 .../Drivers/Apache/Impala/ImpalaHttpConnection.cs  |  3 +-
 .../Apache/Impala/ImpalaStandardConnection.cs      |  6 +-
 .../Drivers/Apache/Spark/SparkHttpConnection.cs    |  3 +-
 .../Apache/Spark/SparkStandardConnection.cs        |  3 +-
 .../Drivers/Apache/Thrift/IPeekableTransport.cs    | 27 ---------
 .../Thrift/Service/Rpc/Thrift/TBinaryColumn.cs     |  4 +-
 .../Thrift/Service/Rpc/Thrift/TBoolColumn.cs       |  4 +-
 .../Thrift/Service/Rpc/Thrift/TByteColumn.cs       |  4 +-
 .../Thrift/Service/Rpc/Thrift/TDoubleColumn.cs     |  4 +-
 .../Apache/Thrift/Service/Rpc/Thrift/TI16Column.cs |  4 +-
 .../Apache/Thrift/Service/Rpc/Thrift/TI32Column.cs |  4 +-
 .../Apache/Thrift/Service/Rpc/Thrift/TI64Column.cs |  4 +-
 .../Thrift/Service/Rpc/Thrift/TStringColumn.cs     |  4 +-
 .../src/Drivers/Apache/Thrift/StreamExtensions.cs  | 34 ++---------
 .../Drivers/Apache/Thrift/ThriftSocketTransport.cs | 69 ----------------------
 16 files changed, 24 insertions(+), 156 deletions(-)

diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs 
b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
index 1c882b78c..0bffe8714 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
@@ -29,6 +29,7 @@ using Apache.Hive.Service.Rpc.Thrift;
 using Thrift;
 using Thrift.Protocol;
 using Thrift.Transport;
+using Thrift.Transport.Client;
 
 namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
 {
@@ -178,7 +179,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             httpClient.DefaultRequestHeaders.ExpectContinue = false;
 
             TConfiguration config = new();
-            ThriftHttpTransport transport = new(httpClient, config)
+            THttpTransport transport = new(httpClient, config)
             {
                 // This value can only be set before the first call/request. 
So if a new value for query timeout
                 // is set, we won't be able to update the value. Setting to 
~infinite and relying on cancellation token
diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs 
b/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
index 157692c2a..ebc92e663 100644
--- a/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
@@ -30,6 +30,7 @@ using Apache.Hive.Service.Rpc.Thrift;
 using Thrift;
 using Thrift.Protocol;
 using Thrift.Transport;
+using Thrift.Transport.Client;
 
 namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
 {
@@ -151,7 +152,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
             httpClient.DefaultRequestHeaders.ExpectContinue = false;
 
             TConfiguration config = new();
-            ThriftHttpTransport transport = new(httpClient, config)
+            THttpTransport transport = new(httpClient, config)
             {
                 // This value can only be set before the first call/request. 
So if a new value for query timeout
                 // is set, we won't be able to update the value. Setting to 
~infinite and relying on cancellation token
diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs 
b/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
index 623335e91..f5c84e2cd 100644
--- a/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
+++ b/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
@@ -25,6 +25,7 @@ using Apache.Arrow.Ipc;
 using Apache.Hive.Service.Rpc.Thrift;
 using Thrift.Protocol;
 using Thrift.Transport;
+using Thrift.Transport.Client;
 
 namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
 {
@@ -105,8 +106,9 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
 
             // Delay the open connection until later.
             bool connectClient = false;
-            ThriftSocketTransport transport = new(hostName!, int.Parse(port!), 
connectClient, config: new());
-            return transport;
+            TSocketTransport transport = new(hostName!, int.Parse(port!), 
connectClient, config: new());
+            TBufferedTransport bufferedTransport = new 
TBufferedTransport(transport);
+            return bufferedTransport;
         }
 
         protected override async Task<TProtocol> 
CreateProtocolAsync(TTransport transport, CancellationToken cancellationToken = 
default)
diff --git a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs 
b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
index f3ad901bd..f17782ebc 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
@@ -30,6 +30,7 @@ using Apache.Hive.Service.Rpc.Thrift;
 using Thrift;
 using Thrift.Protocol;
 using Thrift.Transport;
+using Thrift.Transport.Client;
 
 namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
 {
@@ -174,7 +175,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
             httpClient.DefaultRequestHeaders.ExpectContinue = false;
 
             TConfiguration config = new();
-            ThriftHttpTransport transport = new(httpClient, config)
+            THttpTransport transport = new(httpClient, config)
             {
                 // This value can only be set before the first call/request. 
So if a new value for query timeout
                 // is set, we won't be able to update the value. Setting to 
~infinite and relying on cancellation token
diff --git a/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs 
b/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
index 2c28ea8e1..12abfba4b 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
@@ -23,6 +23,7 @@ using System.Threading.Tasks;
 using Apache.Hive.Service.Rpc.Thrift;
 using Thrift.Protocol;
 using Thrift.Transport;
+using Thrift.Transport.Client;
 
 namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
 {
@@ -97,7 +98,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
 
             // Delay the open connection until later.
             bool connectClient = false;
-            ThriftSocketTransport transport = new(hostName!, int.Parse(port!), 
connectClient, config: new());
+            TSocketTransport transport = new(hostName!, int.Parse(port!), 
connectClient, config: new());
             return transport;
         }
 
diff --git a/csharp/src/Drivers/Apache/Thrift/IPeekableTransport.cs 
b/csharp/src/Drivers/Apache/Thrift/IPeekableTransport.cs
deleted file mode 100644
index ddceac1a3..000000000
--- a/csharp/src/Drivers/Apache/Thrift/IPeekableTransport.cs
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-using System.IO;
-
-namespace Apache.Arrow.Adbc.Drivers.Apache
-{
-    internal interface IPeekableTransport
-    {
-        Stream Input { get; }
-        Stream Output { get; }
-    }
-}
diff --git 
a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBinaryColumn.cs 
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBinaryColumn.cs
index aacec569f..153dc09d6 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBinaryColumn.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBinaryColumn.cs
@@ -9,7 +9,6 @@ using System.IO;
 using System.Text;
 using System.Threading;
 using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
 using Apache.Arrow.Types;
 using Thrift.Collections;
@@ -55,7 +54,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
         ArrowBuffer.Builder<byte> values = null;
         byte[] nulls = null;
         byte[] offsetBuffer = null;
-        Stream transport = ((IPeekableTransport)iprot.Transport).Input;
         int length = -1;
         byte[] preAllocatedBuffer = new byte[65536];
 
@@ -100,7 +98,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
                       tmp = new byte[size];
                     }
 
-                    await transport.ReadExactlyAsync(tmp.AsMemory(0, size), 
cancellationToken);
+                    await iprot.Transport.ReadExactlyAsync(tmp.AsMemory(0, 
size), cancellationToken);
                     values.Append(tmp.AsMemory(0, size).Span);
                   }
                   StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, 
length * sizeof(int));
diff --git a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBoolColumn.cs 
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBoolColumn.cs
index fed4ece10..46745af84 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBoolColumn.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TBoolColumn.cs
@@ -9,7 +9,6 @@ using System.IO;
 using System.Text;
 using System.Threading;
 using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
 using Thrift.Protocol;
 using Thrift.Protocol.Entities;
@@ -53,7 +52,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
 
         ArrowBuffer.BitmapBuilder values = null;
         byte[] nulls = null;
-        Stream transport = ((IPeekableTransport)iprot.Transport).Input;
         int length = -1;
 
         await iprot.ReadStructBeginAsync(cancellationToken);
@@ -77,7 +75,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
                   byte[] buffer = new byte[length];
                   var memory = buffer.AsMemory();
                   iprot.Transport.CheckReadBytesAvailable(buffer.Length);
-                  await transport.ReadExactlyAsync(memory, cancellationToken);
+                  await iprot.Transport.ReadExactlyAsync(buffer.AsMemory(0, 
length), cancellationToken);
 
                   values = new ArrowBuffer.BitmapBuilder(length);
                   for (int _i134 = 0; _i134 < length; ++_i134)
diff --git a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TByteColumn.cs 
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TByteColumn.cs
index 9d21795e9..1a626aa6f 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TByteColumn.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TByteColumn.cs
@@ -11,7 +11,6 @@ using System.Runtime.CompilerServices;
 using System.Text;
 using System.Threading;
 using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
 using Thrift.Protocol;
 using Thrift.Protocol.Entities;
@@ -55,7 +54,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
 
         byte[] nulls = null;
         byte[] buffer = null;
-        Stream transport = ((IPeekableTransport)iprot.Transport).Input;
         int length = -1;
 
         await iprot.ReadStructBeginAsync(cancellationToken);
@@ -79,7 +77,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
                   buffer = new byte[length];
                   var memory = buffer.AsMemory();
                   iprot.Transport.CheckReadBytesAvailable(buffer.Length);
-                  await transport.ReadExactlyAsync(memory, cancellationToken);
+                  await iprot.Transport.ReadExactlyAsync(memory, 
cancellationToken);
 
                   await iprot.ReadListEndAsync(cancellationToken);
                 }
diff --git 
a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TDoubleColumn.cs 
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TDoubleColumn.cs
index 5ba749148..a2c50a73d 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TDoubleColumn.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TDoubleColumn.cs
@@ -11,7 +11,6 @@ using System.Runtime.CompilerServices;
 using System.Text;
 using System.Threading;
 using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
 using Thrift.Protocol;
 using Thrift.Protocol.Entities;
@@ -53,7 +52,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
 
         byte[] nulls = null;
         byte[] buffer = null;
-        Stream transport = ((IPeekableTransport)iprot.Transport).Input;
         int length = -1;
 
         await iprot.ReadStructBeginAsync(cancellationToken);
@@ -77,7 +75,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
                   buffer = new byte[length * sizeof(double)];
                   var memory = buffer.AsMemory();
                   iprot.Transport.CheckReadBytesAvailable(buffer.Length);
-                  await transport.ReadExactlyAsync(memory, cancellationToken);
+                  await iprot.Transport.ReadExactlyAsync(memory, 
cancellationToken);
                   for (int _i179 = 0; _i179 < length; ++_i179)
                   {
                     StreamExtensions.ReverseEndianI64AtOffset(memory.Span, 
_i179 * sizeof(double));
diff --git a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI16Column.cs 
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI16Column.cs
index 715f8397e..3d7c8c480 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI16Column.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI16Column.cs
@@ -11,7 +11,6 @@ using System.Runtime.CompilerServices;
 using System.Text;
 using System.Threading;
 using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
 using Thrift.Protocol;
 using Thrift.Protocol.Entities;
@@ -55,7 +54,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
 
         byte[] nulls = null;
         byte[] buffer = null;
-        Stream transport = ((IPeekableTransport)iprot.Transport).Input;
         int length = -1;
 
         await iprot.ReadStructBeginAsync(cancellationToken);
@@ -79,7 +77,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
                   buffer = new byte[length * sizeof(short)];
                   var memory = buffer.AsMemory();
                   iprot.Transport.CheckReadBytesAvailable(buffer.Length);
-                  await transport.ReadExactlyAsync(memory, cancellationToken);
+                  await iprot.Transport.ReadExactlyAsync(memory, 
cancellationToken);
                   for (int _i152 = 0; _i152 < length; ++_i152)
                   {
                     StreamExtensions.ReverseEndiannessInt16(memory.Span, _i152 
* sizeof(short));
diff --git a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI32Column.cs 
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI32Column.cs
index dba3a316d..d4543318f 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI32Column.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI32Column.cs
@@ -11,7 +11,6 @@ using System.Runtime.CompilerServices;
 using System.Text;
 using System.Threading;
 using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
 using Thrift.Protocol;
 using Thrift.Protocol.Entities;
@@ -55,7 +54,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
 
         byte[] nulls = null;
         byte[] buffer = null;
-        Stream transport = ((IPeekableTransport)iprot.Transport).Input;
         int length = -1;
 
         await iprot.ReadStructBeginAsync(cancellationToken);
@@ -78,7 +76,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
                   buffer = new byte[length * sizeof(int)];
                   var memory = buffer.AsMemory();
                   iprot.Transport.CheckReadBytesAvailable(buffer.Length);
-                  await transport.ReadExactlyAsync(memory, cancellationToken);
+                  await iprot.Transport.ReadExactlyAsync(memory, 
cancellationToken);
                   for (int _i161 = 0; _i161 < length; ++_i161)
                   {
                     StreamExtensions.ReverseEndianI32AtOffset(memory.Span, 
_i161 * sizeof(int));
diff --git a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI64Column.cs 
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI64Column.cs
index 9401e9035..77b872959 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI64Column.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TI64Column.cs
@@ -11,7 +11,6 @@ using System.Runtime.CompilerServices;
 using System.Text;
 using System.Threading;
 using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
 using Thrift.Protocol;
 using Thrift.Protocol.Entities;
@@ -54,7 +53,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
 
         byte[] nulls = null;
         byte[] buffer = null;
-        Stream transport = ((IPeekableTransport)iprot.Transport).Input;
         int length = -1;
 
         await iprot.ReadStructBeginAsync(cancellationToken);
@@ -78,7 +76,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
                   buffer = new byte[length * sizeof(long)];
                   var memory = buffer.AsMemory();
                   iprot.Transport.CheckReadBytesAvailable(buffer.Length);
-                  await transport.ReadExactlyAsync(memory, cancellationToken);
+                  await iprot.Transport.ReadExactlyAsync(memory, 
cancellationToken);
                   for (int _i170 = 0; _i170 < length; ++_i170)
                   {
                     StreamExtensions.ReverseEndianI64AtOffset(memory.Span, 
_i170 * sizeof(long));
diff --git 
a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TStringColumn.cs 
b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TStringColumn.cs
index fdbc3eea8..24a3e07f0 100644
--- a/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TStringColumn.cs
+++ b/csharp/src/Drivers/Apache/Thrift/Service/Rpc/Thrift/TStringColumn.cs
@@ -10,7 +10,6 @@ using System.Runtime.CompilerServices;
 using System.Text;
 using System.Threading;
 using Apache.Arrow;
-using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
 using Thrift.Collections;
 using Thrift.Protocol;
@@ -55,7 +54,6 @@ namespace Apache.Hive.Service.Rpc.Thrift
         ArrowBuffer.Builder<byte> values = null;
         byte[] nulls = null;
         byte[] offsetBuffer = null;
-        Stream transport = ((IPeekableTransport)iprot.Transport).Input;
         int length = -1;
         byte[] preAllocatedBuffer = new byte[65536];
 
@@ -101,7 +99,7 @@ namespace Apache.Hive.Service.Rpc.Thrift
                       tmp = new byte[size];
                     }
 
-                    await transport.ReadExactlyAsync(tmp.AsMemory(0, size), 
cancellationToken);
+                    await iprot.Transport.ReadExactlyAsync(tmp.AsMemory(0, 
size), cancellationToken);
                     values.Append(tmp.AsMemory(0, size).Span);
                   }
                   StreamExtensions.WriteInt32LittleEndian(offset, memory.Span, 
length * 4);
diff --git a/csharp/src/Drivers/Apache/Thrift/StreamExtensions.cs 
b/csharp/src/Drivers/Apache/Thrift/StreamExtensions.cs
index 6563b7ea0..ec583f145 100644
--- a/csharp/src/Drivers/Apache/Thrift/StreamExtensions.cs
+++ b/csharp/src/Drivers/Apache/Thrift/StreamExtensions.cs
@@ -21,6 +21,7 @@ using System.IO;
 using System.Runtime.InteropServices;
 using System.Threading;
 using System.Threading.Tasks;
+using Thrift.Transport;
 
 namespace Apache.Arrow.Adbc.Drivers.Apache.Thrift
 {
@@ -100,9 +101,9 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Thrift
             return dictionary.TryGetValue(key, out TValue? value) ? value : 
defaultValue;
         }
 
-        public static async Task<bool> ReadExactlyAsync(this Stream stream, 
Memory<byte> memory, CancellationToken cancellationToken = default)
+        public static async Task<bool> ReadExactlyAsync(this TTransport 
transport, Memory<byte> memory, CancellationToken cancellationToken = default)
         {
-            if (stream == null) throw new 
ArgumentNullException(nameof(stream));
+            if (transport == null) throw new 
ArgumentNullException(nameof(transport));
 
             // Try to get the underlying array from the Memory<byte>
             if (!MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> 
arraySegment))
@@ -115,7 +116,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Thrift
 
             while (totalBytesRead < count)
             {
-                int bytesRead = await stream.ReadAsync(arraySegment.Array!, 
arraySegment.Offset + totalBytesRead, count - totalBytesRead, 
cancellationToken).ConfigureAwait(false);
+                int bytesRead = await transport.ReadAsync(arraySegment.Array!, 
arraySegment.Offset + totalBytesRead, count - totalBytesRead, 
cancellationToken).ConfigureAwait(false);
 
                 if (bytesRead == 0)
                 {
@@ -128,32 +129,5 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Thrift
 
             return true;
         }
-
-        public static async Task<bool> ReadExactlyAsync(this Stream stream, 
byte[] buffer, int offset, int count, CancellationToken cancellationToken = 
default)
-        {
-            if (stream == null) throw new 
ArgumentNullException(nameof(stream));
-            if (buffer == null) throw new 
ArgumentNullException(nameof(buffer));
-            if (offset < 0 || offset >= buffer.Length) throw new 
ArgumentOutOfRangeException(nameof(offset));
-            if (count < 0 || (count + offset) > buffer.Length) throw new 
ArgumentOutOfRangeException(nameof(count));
-
-            int bytesReadTotal = 0;
-
-            while (bytesReadTotal < count)
-            {
-                int bytesRead = await stream.ReadAsync(buffer, offset + 
bytesReadTotal, count - bytesReadTotal, 
cancellationToken).ConfigureAwait(false);
-
-                // If ReadAsync returns 0, it means the end of the stream has 
been reached
-                if (bytesRead == 0)
-                {
-                    // If we haven't read any bytes at all, it's okay (might 
be at the end of the stream)
-                    // But if we've read some bytes and then hit the end of 
the stream, it's unexpected
-                    return bytesReadTotal == 0;
-                }
-
-                bytesReadTotal += bytesRead;
-            }
-
-            return true;
-        }
     }
 }
diff --git a/csharp/src/Drivers/Apache/Thrift/ThriftSocketTransport.cs 
b/csharp/src/Drivers/Apache/Thrift/ThriftSocketTransport.cs
deleted file mode 100644
index c9336ff37..000000000
--- a/csharp/src/Drivers/Apache/Thrift/ThriftSocketTransport.cs
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-using System.IO;
-using System.Net.Http;
-using System.Reflection;
-using Thrift;
-using Thrift.Transport.Client;
-
-namespace Apache.Arrow.Adbc.Drivers.Apache
-{
-    class ThriftSocketTransport : TSocketTransport, IPeekableTransport
-    {
-        public ThriftSocketTransport(string host, int port, TConfiguration 
config, int timeout = 0)
-            : base(host, port, config, timeout)
-        {
-        }
-
-        public ThriftSocketTransport(string hostNameOrIpAddress, int port, 
bool connectClient, TConfiguration config, int timeout = 0)
-            : base(hostNameOrIpAddress, port, connectClient, config, timeout)
-        {
-        }
-
-        public Stream Input { get { return this.InputStream; } }
-        public Stream Output { get { return this.OutputStream; } }
-    }
-
-    // TODO: Experimental
-    class ThriftHttpTransport : THttpTransport, IPeekableTransport
-    {
-        public ThriftHttpTransport(HttpClient httpClient, TConfiguration 
config)
-            : base(httpClient, config)
-        {
-
-        }
-
-        public Stream Input
-        {
-            get
-            {
-                // not advocating for this, but it works
-                Stream stream = 
(Stream)((FieldInfo[])((TypeInfo)this.GetType().BaseType!).DeclaredFields)[4].GetValue(this)!;
-                return stream;
-            }
-        }
-        public Stream Output
-        {
-            get
-            {
-                Stream stream = 
(Stream)this.GetType().BaseType!.GetField("_outputStream", 
BindingFlags.NonPublic)!.GetValue(this)!;
-                return stream;
-            }
-        }
-    }
-}

Reply via email to