This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 4639ca8ba fix(csharp/src/Drivers/Apache): remove interleaved async
look-ahead code (#2273)
4639ca8ba is described below
commit 4639ca8ba98a4a8e91ad1e5dac971dcb6835666d
Author: Bruce Irschick <[email protected]>
AuthorDate: Tue Oct 22 11:55:34 2024 -0700
fix(csharp/src/Drivers/Apache): remove interleaved async look-ahead code
(#2273)
The attempt to use “look-ahead” buffering in the HiveServer2Reader is
not supported by the Thrift library. The issue is that the Thrift
library uses a shared buffer on the Client/Protocol/Transport object -so
interleaving Fetch and ExecuteStatement will fail because the buffer
will get closed unexpectedly.
---
csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs | 17 +++--------------
csharp/test/Drivers/Apache/Spark/DriverTests.cs | 2 +-
2 files changed, 4 insertions(+), 15 deletions(-)
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
index e3068530c..5c07d8483 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
@@ -64,9 +64,6 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{ ArrowTypeId.Timestamp, ConvertToTimestamp },
};
- // Look-ahead task.
- Task<TFetchResultsResp>? _fetchResultResponseTask = default;
-
public HiveServer2Reader(
HiveServer2Statement statement,
Schema schema,
@@ -76,8 +73,6 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
_statement = statement;
Schema = schema;
_dataTypeConversion = dataTypeConversion;
- // Start the pre-fetch of the first batch
- _fetchResultResponseTask = FetchNext(_statement,
cancellationToken);
}
public Schema Schema { get; }
@@ -91,7 +86,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
}
// Await the fetch response
- TFetchResultsResp response = await (_fetchResultResponseTask ??
throw new InvalidOperationException("unexpected state - fetch result task
should be set."));
+ TFetchResultsResp response = await FetchNext(_statement,
cancellationToken);
// Build the current batch
RecordBatch result = CreateBatch(response, out int fetchedRows);
@@ -100,12 +95,6 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
// This is the last batch
_statement = null;
- _fetchResultResponseTask = null;
- }
- else
- {
- // Otherwise, start the pre-fetch of the next batch
- _fetchResultResponseTask = FetchNext(_statement,
cancellationToken);
}
// Return the current batch.
@@ -128,10 +117,10 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
return new RecordBatch(Schema, columnData, length);
}
- private static Task<TFetchResultsResp> FetchNext(HiveServer2Statement
statement, CancellationToken cancellationToken = default)
+ private static async Task<TFetchResultsResp>
FetchNext(HiveServer2Statement statement, CancellationToken cancellationToken =
default)
{
var request = new TFetchResultsReq(statement.OperationHandle,
TFetchOrientation.FETCH_NEXT, statement.BatchSize);
- return statement.Connection.Client.FetchResults(request,
cancellationToken);
+ return await statement.Connection.Client.FetchResults(request,
cancellationToken);
}
public void Dispose()
diff --git a/csharp/test/Drivers/Apache/Spark/DriverTests.cs
b/csharp/test/Drivers/Apache/Spark/DriverTests.cs
index 053e47abf..19a787455 100644
--- a/csharp/test/Drivers/Apache/Spark/DriverTests.cs
+++ b/csharp/test/Drivers/Apache/Spark/DriverTests.cs
@@ -114,7 +114,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
for (int i = 0; i < queries.Length; i++)
{
string query = queries[i];
- AdbcStatement statement = adbcConnection.CreateStatement();
+ using AdbcStatement statement =
adbcConnection.CreateStatement();
statement.SqlQuery = query;
UpdateResult updateResult = statement.ExecuteUpdate();