This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 4639ca8ba fix(csharp/src/Drivers/Apache): remove interleaved async 
look-ahead code (#2273)
4639ca8ba is described below

commit 4639ca8ba98a4a8e91ad1e5dac971dcb6835666d
Author: Bruce Irschick <[email protected]>
AuthorDate: Tue Oct 22 11:55:34 2024 -0700

    fix(csharp/src/Drivers/Apache): remove interleaved async look-ahead code 
(#2273)
    
    The attempt to use “look-ahead” buffering in the HiveServer2Reader is
    not supported by the Thrift library. The issue is that the Thrift
    library uses a shared buffer on the Client/Protocol/Transport object -so
    interleaving Fetch and ExecuteStatement will fail because the buffer
    will get closed unexpectedly.
---
 csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs | 17 +++--------------
 csharp/test/Drivers/Apache/Spark/DriverTests.cs      |  2 +-
 2 files changed, 4 insertions(+), 15 deletions(-)

diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs 
b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
index e3068530c..5c07d8483 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
@@ -64,9 +64,6 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
                 { ArrowTypeId.Timestamp, ConvertToTimestamp },
             };
 
-        // Look-ahead task.
-        Task<TFetchResultsResp>? _fetchResultResponseTask = default;
-
         public HiveServer2Reader(
             HiveServer2Statement statement,
             Schema schema,
@@ -76,8 +73,6 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             _statement = statement;
             Schema = schema;
             _dataTypeConversion = dataTypeConversion;
-            // Start the pre-fetch of the first batch
-            _fetchResultResponseTask = FetchNext(_statement, 
cancellationToken);
         }
 
         public Schema Schema { get; }
@@ -91,7 +86,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             }
 
             // Await the fetch response
-            TFetchResultsResp response = await (_fetchResultResponseTask ?? 
throw new InvalidOperationException("unexpected state - fetch result task 
should be set."));
+            TFetchResultsResp response = await FetchNext(_statement, 
cancellationToken);
 
             // Build the current batch
             RecordBatch result = CreateBatch(response, out int fetchedRows);
@@ -100,12 +95,6 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             {
                 // This is the last batch
                 _statement = null;
-                _fetchResultResponseTask = null;
-            }
-            else
-            {
-                // Otherwise, start the pre-fetch of the next batch
-                _fetchResultResponseTask = FetchNext(_statement, 
cancellationToken);
             }
 
             // Return the current batch.
@@ -128,10 +117,10 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             return new RecordBatch(Schema, columnData, length);
         }
 
-        private static Task<TFetchResultsResp> FetchNext(HiveServer2Statement 
statement, CancellationToken cancellationToken = default)
+        private static async Task<TFetchResultsResp> 
FetchNext(HiveServer2Statement statement, CancellationToken cancellationToken = 
default)
         {
             var request = new TFetchResultsReq(statement.OperationHandle, 
TFetchOrientation.FETCH_NEXT, statement.BatchSize);
-            return statement.Connection.Client.FetchResults(request, 
cancellationToken);
+            return await statement.Connection.Client.FetchResults(request, 
cancellationToken);
         }
 
         public void Dispose()
diff --git a/csharp/test/Drivers/Apache/Spark/DriverTests.cs 
b/csharp/test/Drivers/Apache/Spark/DriverTests.cs
index 053e47abf..19a787455 100644
--- a/csharp/test/Drivers/Apache/Spark/DriverTests.cs
+++ b/csharp/test/Drivers/Apache/Spark/DriverTests.cs
@@ -114,7 +114,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
             for (int i = 0; i < queries.Length; i++)
             {
                 string query = queries[i];
-                AdbcStatement statement = adbcConnection.CreateStatement();
+                using AdbcStatement statement = 
adbcConnection.CreateStatement();
                 statement.SqlQuery = query;
 
                 UpdateResult updateResult = statement.ExecuteUpdate();

Reply via email to