This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 7d05c5850 feat(csharp/src): Improve efficiency of C# BigQuery and
Databricks drivers (#3583)
7d05c5850 is described below
commit 7d05c58501d3f6cf1f5bbef66cbc4e6afb1395a0
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Mon Oct 20 07:27:15 2025 -0700
feat(csharp/src): Improve efficiency of C# BigQuery and Databricks drivers
(#3583)
For BigQuery and Databrics, Arrow-formatted record batches are returned
from the server in a format that's not strictly compatible with the
Arrow stream format due to the way that the schema and the array data
are split. A change was recently made to the C# Arrow library to allow
these to be deserialized independently, which means that we no longer
need to indirect through a Stream -- saving on both CPU and memory and
reducing pressure on the GC.
---
.../src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj | 2 +-
csharp/src/Drivers/Apache/Thrift/ChunkStream.cs | 115 ---------------------
csharp/src/Drivers/BigQuery/BigQueryStatement.cs | 99 ++++--------------
.../Drivers/Databricks/Reader/DatabricksReader.cs | 18 +++-
.../Drivers/BigQuery/BigQueryStatementTests.cs | 21 ----
.../Drivers/Databricks/E2E/CloudFetchE2ETest.cs | 2 +-
6 files changed, 37 insertions(+), 220 deletions(-)
diff --git a/csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj
b/csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj
index 4f92168e1..fe3fa125e 100644
--- a/csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj
+++ b/csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj
@@ -6,7 +6,7 @@
<PackageReadmeFile>readme.md</PackageReadmeFile>
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="Apache.Arrow" Version="20.0.0" />
+ <PackageReference Include="Apache.Arrow" Version="22.1.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource"
Version="9.0.6" />
</ItemGroup>
<ItemGroup
Condition="!$([MSBuild]::IsTargetFrameworkCompatible($(TargetFramework),
'net6.0'))">
diff --git a/csharp/src/Drivers/Apache/Thrift/ChunkStream.cs
b/csharp/src/Drivers/Apache/Thrift/ChunkStream.cs
deleted file mode 100644
index 499e4930c..000000000
--- a/csharp/src/Drivers/Apache/Thrift/ChunkStream.cs
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-using Apache.Arrow.Ipc;
-
-namespace Apache.Arrow.Adbc.Drivers.Apache
-{
- internal class ChunkStream : Stream
- {
- ReadOnlyMemory<byte> currentBuffer;
- ReadOnlyMemory<byte> data;
- bool first;
- int position;
-
- public ChunkStream(Schema schema, byte[] data)
- : this(schema, new ReadOnlyMemory<byte>(data))
- {
- // Call the other constructor to avoid duplication
- }
-
- public ChunkStream(Schema schema, ReadOnlyMemory<byte> data)
- {
- MemoryStream buffer = new MemoryStream();
- ArrowStreamWriter writer = new ArrowStreamWriter(buffer, schema,
leaveOpen: true);
- writer.WriteStart();
- writer.WriteEnd();
- writer.Dispose();
-
- this.currentBuffer = new ReadOnlyMemory<byte>(buffer.GetBuffer(),
0, (int)buffer.Length - 8);
- this.data = data;
- this.first = true;
- }
-
- public override bool CanRead => true;
-
- public override bool CanSeek => false;
-
- public override bool CanWrite => false;
-
- public override long Length => throw new NotSupportedException();
-
- public override long Position { get => throw new
NotSupportedException(); set => throw new NotSupportedException(); }
-
- public override void Flush()
- {
- }
-
- public override int Read(byte[] buffer, int offset, int count)
- {
- int remaining = this.currentBuffer.Length - this.position;
- if (remaining == 0)
- {
- if (this.first)
- {
- this.first = false;
- }
- else
- {
- return 0;
- }
- this.currentBuffer = this.data;
- this.position = 0;
- remaining = this.currentBuffer.Length - this.position;
- }
-
- int bytes = Math.Min(remaining, count);
- this.currentBuffer.Slice(this.position, bytes).CopyTo(new
Memory<byte>(buffer, offset, bytes));
- this.position += bytes;
- return bytes;
- }
-
- public override Task<int> ReadAsync(byte[] buffer, int offset, int
count, CancellationToken cancellationToken)
- {
- return base.ReadAsync(buffer, offset, count, cancellationToken);
- }
-
- public override int ReadByte()
- {
- return base.ReadByte();
- }
-
- public override long Seek(long offset, SeekOrigin origin)
- {
- throw new NotSupportedException();
- }
-
- public override void SetLength(long value)
- {
- throw new NotSupportedException();
- }
-
- public override void Write(byte[] buffer, int offset, int count)
- {
- throw new NotSupportedException();
- }
- }
-}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
index cb6d7435a..5781ab483 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
@@ -19,7 +19,6 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
-using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -392,14 +391,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
ReadRowsStream stream = new ReadRowsStream(enumerator);
activity?.AddBigQueryTag("read_stream.has_rows", stream.HasRows);
- if (stream.HasRows)
- {
- return new ArrowStreamReader(stream);
- }
- else
- {
- return null;
- }
+ return stream.HasRows ? stream : null;
}
private QueryOptions ValidateOptions(Activity? activity)
@@ -752,13 +744,12 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
}
}
- sealed class ReadRowsStream : Stream
+ sealed class ReadRowsStream : IArrowArrayStream
{
- IAsyncEnumerator<ReadRowsResponse> response;
- ReadOnlyMemory<byte> currentBuffer;
+ readonly Schema? schema;
+ readonly IAsyncEnumerator<ReadRowsResponse> response;
bool first;
- int position;
- bool hasRows;
+ bool disposed;
public ReadRowsStream(IAsyncEnumerator<ReadRowsResponse> response)
{
@@ -766,91 +757,41 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
if (response.MoveNextAsync().Result && response.Current !=
null)
{
- this.currentBuffer =
response.Current.ArrowSchema.SerializedSchema.Memory;
- this.hasRows = true;
- }
- else
- {
- this.hasRows = false;
+ this.schema =
ArrowSerializationHelpers.DeserializeSchema(response.Current.ArrowSchema.SerializedSchema.Memory);
}
}
catch (InvalidOperationException)
{
- this.hasRows = false;
}
this.response = response;
this.first = true;
}
- public bool HasRows => this.hasRows;
-
- public override bool CanRead => true;
+ public Schema Schema => this.schema ?? throw new
InvalidOperationException("Stream has no rows");
+ public bool HasRows => this.schema != null;
- public override bool CanSeek => false;
-
- public override bool CanWrite => false;
-
- public override long Length => throw new NotSupportedException();
-
- public override long Position { get => throw new
NotSupportedException(); set => throw new NotSupportedException(); }
-
- public override void Flush()
+ public async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken)
{
- }
-
- public override int Read(byte[] buffer, int offset, int count)
- {
- if (!hasRows)
+ if (this.first)
{
- return 0;
+ this.first = false;
}
-
- int remaining = this.currentBuffer.Length - this.position;
- if (remaining == 0)
+ else if (this.disposed || !await this.response.MoveNextAsync())
{
- if (this.first)
- {
- this.first = false;
- }
- else if (!this.response.MoveNextAsync().Result)
- {
- return 0;
- }
- this.currentBuffer =
this.response.Current.ArrowRecordBatch.SerializedRecordBatch.Memory;
- this.position = 0;
- remaining = this.currentBuffer.Length - this.position;
+ return null;
}
- int bytes = Math.Min(remaining, count);
- this.currentBuffer.Slice(this.position, bytes).CopyTo(new
Memory<byte>(buffer, offset, bytes));
- this.position += bytes;
- return bytes;
- }
-
- public override Task<int> ReadAsync(byte[] buffer, int offset, int
count, CancellationToken cancellationToken)
- {
- return base.ReadAsync(buffer, offset, count,
cancellationToken);
- }
-
- public override int ReadByte()
- {
- return base.ReadByte();
- }
-
- public override long Seek(long offset, SeekOrigin origin)
- {
- throw new NotSupportedException();
+ return
ArrowSerializationHelpers.DeserializeRecordBatch(this.schema,
this.response.Current.ArrowRecordBatch.SerializedRecordBatch.Memory);
}
- public override void SetLength(long value)
- {
- throw new NotSupportedException();
- }
-
- public override void Write(byte[] buffer, int offset, int count)
+ public void Dispose()
{
- throw new NotSupportedException();
+ if (!this.disposed)
+ {
+ this.response.DisposeAsync().GetAwaiter().GetResult();
+ this.disposed = true;
+ }
}
}
}
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
index a0ff9797c..be87a599a 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
@@ -19,7 +19,6 @@ using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
-using Apache.Arrow.Adbc.Drivers.Apache;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
@@ -125,8 +124,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
dataToUse = Lz4Utilities.DecompressLz4(batch.Batch);
}
- // Always use ChunkStream which ensures proper schema handling
- this.reader = new ArrowStreamReader(new
ChunkStream(this.schema, dataToUse));
+ this.reader = new
SingleBatch(ArrowSerializationHelpers.DeserializeRecordBatch(this.schema,
dataToUse));
}
catch (Exception ex)
{
@@ -140,5 +138,19 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
}
this.index++;
}
+
+ sealed class SingleBatch : IArrowReader
+ {
+ private RecordBatch? _recordBatch;
+
+ public SingleBatch(RecordBatch recordBatch) => _recordBatch =
recordBatch;
+
+ public ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ {
+ RecordBatch? result = _recordBatch;
+ _recordBatch = null;
+ return new ValueTask<RecordBatch?>(result);
+ }
+ }
}
}
diff --git a/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs
b/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs
index 17fa72b5a..a24a3e3a9 100644
--- a/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs
+++ b/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs
@@ -34,27 +34,6 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
{
public class BigQueryStatementTests
{
- [Fact]
- public void ReadChunkWithRetries_CalledMoreThanOnce()
- {
- TokenProtectedReadClientManger clientMgr =
GetMockTokenProtectedReadClientManger();
- var mockReadRowsStream = GetMockReadRowsStream(clientMgr);
- mockReadRowsStream
- .Setup(s => s.GetResponseStream())
- .Throws(new InvalidOperationException("GetAsyncEnumerator can
only be called once for a gRPC response stream wrapper."));
-
- var statement = CreateBigQueryStatementForTest();
- SetupRetryValues(statement);
-
- // this should remain an issue because it indicates we aren't
doing something correctly
- // due to the setup, it looks like:
- //----System.Reflection.TargetInvocationException : Exception has
been thrown by the target of an invocation.
- //--------Apache.Arrow.Adbc.AdbcException : Cannot
execute<ReadChunkWithRetries>b__0 after 5 tries.Last exception:
InvalidOperationException: GetAsyncEnumerator can only be called once for a
gRPC response stream wrapper.
- //------------ System.InvalidOperationException :
GetAsyncEnumerator can only be called once for a gRPC response stream wrapper.
-
- Assert.Throws<TargetInvocationException>(() => {
statement.ReadChunkWithRetriesForTest(clientMgr, "test-stream", null); });
- }
-
[Theory]
[InlineData(true)] //.MoveNextAsync throws the error
[InlineData(false)] //.Current throws the error
diff --git a/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs
b/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs
index 31bd2e87d..f0d79276c 100644
--- a/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs
@@ -58,7 +58,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
/// </summary>
[Theory]
[MemberData(nameof(TestCases))]
- private async Task TestRealDatabricksCloudFetch(string query, int
rowCount, bool useCloudFetch, bool enableDirectResults)
+ public async Task TestRealDatabricksCloudFetch(string query, int
rowCount, bool useCloudFetch, bool enableDirectResults)
{
var connection = NewConnection(TestConfiguration, new
Dictionary<string, string>
{