This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d05c5850 feat(csharp/src): Improve efficiency of C# BigQuery and 
Databricks drivers (#3583)
7d05c5850 is described below

commit 7d05c58501d3f6cf1f5bbef66cbc4e6afb1395a0
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Mon Oct 20 07:27:15 2025 -0700

    feat(csharp/src): Improve efficiency of C# BigQuery and Databricks drivers 
(#3583)
    
    For BigQuery and Databrics, Arrow-formatted record batches are returned
    from the server in a format that's not strictly compatible with the
    Arrow stream format due to the way that the schema and the array data
    are split. A change was recently made to the C# Arrow library to allow
    these to be deserialized independently, which means that we no longer
    need to indirect through a Stream -- saving on both CPU and memory and
    reducing pressure on the GC.
---
 .../src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj |   2 +-
 csharp/src/Drivers/Apache/Thrift/ChunkStream.cs    | 115 ---------------------
 csharp/src/Drivers/BigQuery/BigQueryStatement.cs   |  99 ++++--------------
 .../Drivers/Databricks/Reader/DatabricksReader.cs  |  18 +++-
 .../Drivers/BigQuery/BigQueryStatementTests.cs     |  21 ----
 .../Drivers/Databricks/E2E/CloudFetchE2ETest.cs    |   2 +-
 6 files changed, 37 insertions(+), 220 deletions(-)

diff --git a/csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj 
b/csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj
index 4f92168e1..fe3fa125e 100644
--- a/csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj
+++ b/csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj
@@ -6,7 +6,7 @@
     <PackageReadmeFile>readme.md</PackageReadmeFile>
   </PropertyGroup>
   <ItemGroup>
-    <PackageReference Include="Apache.Arrow" Version="20.0.0" />
+    <PackageReference Include="Apache.Arrow" Version="22.1.0" />
     <PackageReference Include="System.Diagnostics.DiagnosticSource" 
Version="9.0.6" />
   </ItemGroup>
   <ItemGroup 
Condition="!$([MSBuild]::IsTargetFrameworkCompatible($(TargetFramework), 
'net6.0'))">
diff --git a/csharp/src/Drivers/Apache/Thrift/ChunkStream.cs 
b/csharp/src/Drivers/Apache/Thrift/ChunkStream.cs
deleted file mode 100644
index 499e4930c..000000000
--- a/csharp/src/Drivers/Apache/Thrift/ChunkStream.cs
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-using Apache.Arrow.Ipc;
-
-namespace Apache.Arrow.Adbc.Drivers.Apache
-{
-    internal class ChunkStream : Stream
-    {
-        ReadOnlyMemory<byte> currentBuffer;
-        ReadOnlyMemory<byte> data;
-        bool first;
-        int position;
-
-        public ChunkStream(Schema schema, byte[] data)
-            : this(schema, new ReadOnlyMemory<byte>(data))
-        {
-            // Call the other constructor to avoid duplication
-        }
-
-        public ChunkStream(Schema schema, ReadOnlyMemory<byte> data)
-        {
-            MemoryStream buffer = new MemoryStream();
-            ArrowStreamWriter writer = new ArrowStreamWriter(buffer, schema, 
leaveOpen: true);
-            writer.WriteStart();
-            writer.WriteEnd();
-            writer.Dispose();
-
-            this.currentBuffer = new ReadOnlyMemory<byte>(buffer.GetBuffer(), 
0, (int)buffer.Length - 8);
-            this.data = data;
-            this.first = true;
-        }
-
-        public override bool CanRead => true;
-
-        public override bool CanSeek => false;
-
-        public override bool CanWrite => false;
-
-        public override long Length => throw new NotSupportedException();
-
-        public override long Position { get => throw new 
NotSupportedException(); set => throw new NotSupportedException(); }
-
-        public override void Flush()
-        {
-        }
-
-        public override int Read(byte[] buffer, int offset, int count)
-        {
-            int remaining = this.currentBuffer.Length - this.position;
-            if (remaining == 0)
-            {
-                if (this.first)
-                {
-                    this.first = false;
-                }
-                else
-                {
-                    return 0;
-                }
-                this.currentBuffer = this.data;
-                this.position = 0;
-                remaining = this.currentBuffer.Length - this.position;
-            }
-
-            int bytes = Math.Min(remaining, count);
-            this.currentBuffer.Slice(this.position, bytes).CopyTo(new 
Memory<byte>(buffer, offset, bytes));
-            this.position += bytes;
-            return bytes;
-        }
-
-        public override Task<int> ReadAsync(byte[] buffer, int offset, int 
count, CancellationToken cancellationToken)
-        {
-            return base.ReadAsync(buffer, offset, count, cancellationToken);
-        }
-
-        public override int ReadByte()
-        {
-            return base.ReadByte();
-        }
-
-        public override long Seek(long offset, SeekOrigin origin)
-        {
-            throw new NotSupportedException();
-        }
-
-        public override void SetLength(long value)
-        {
-            throw new NotSupportedException();
-        }
-
-        public override void Write(byte[] buffer, int offset, int count)
-        {
-            throw new NotSupportedException();
-        }
-    }
-}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs 
b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
index cb6d7435a..5781ab483 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
@@ -19,7 +19,6 @@ using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Diagnostics;
-using System.IO;
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
@@ -392,14 +391,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
             ReadRowsStream stream = new ReadRowsStream(enumerator);
             activity?.AddBigQueryTag("read_stream.has_rows", stream.HasRows);
 
-            if (stream.HasRows)
-            {
-                return new ArrowStreamReader(stream);
-            }
-            else
-            {
-                return null;
-            }
+            return stream.HasRows ? stream : null;
         }
 
         private QueryOptions ValidateOptions(Activity? activity)
@@ -752,13 +744,12 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
             }
         }
 
-        sealed class ReadRowsStream : Stream
+        sealed class ReadRowsStream : IArrowArrayStream
         {
-            IAsyncEnumerator<ReadRowsResponse> response;
-            ReadOnlyMemory<byte> currentBuffer;
+            readonly Schema? schema;
+            readonly IAsyncEnumerator<ReadRowsResponse> response;
             bool first;
-            int position;
-            bool hasRows;
+            bool disposed;
 
             public ReadRowsStream(IAsyncEnumerator<ReadRowsResponse> response)
             {
@@ -766,91 +757,41 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
                 {
                     if (response.MoveNextAsync().Result && response.Current != 
null)
                     {
-                        this.currentBuffer = 
response.Current.ArrowSchema.SerializedSchema.Memory;
-                        this.hasRows = true;
-                    }
-                    else
-                    {
-                        this.hasRows = false;
+                        this.schema = 
ArrowSerializationHelpers.DeserializeSchema(response.Current.ArrowSchema.SerializedSchema.Memory);
                     }
                 }
                 catch (InvalidOperationException)
                 {
-                    this.hasRows = false;
                 }
 
                 this.response = response;
                 this.first = true;
             }
 
-            public bool HasRows => this.hasRows;
-
-            public override bool CanRead => true;
+            public Schema Schema => this.schema ?? throw new 
InvalidOperationException("Stream has no rows");
+            public bool HasRows => this.schema != null;
 
-            public override bool CanSeek => false;
-
-            public override bool CanWrite => false;
-
-            public override long Length => throw new NotSupportedException();
-
-            public override long Position { get => throw new 
NotSupportedException(); set => throw new NotSupportedException(); }
-
-            public override void Flush()
+            public async ValueTask<RecordBatch?> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken)
             {
-            }
-
-            public override int Read(byte[] buffer, int offset, int count)
-            {
-                if (!hasRows)
+                if (this.first)
                 {
-                    return 0;
+                    this.first = false;
                 }
-
-                int remaining = this.currentBuffer.Length - this.position;
-                if (remaining == 0)
+                else if (this.disposed || !await this.response.MoveNextAsync())
                 {
-                    if (this.first)
-                    {
-                        this.first = false;
-                    }
-                    else if (!this.response.MoveNextAsync().Result)
-                    {
-                        return 0;
-                    }
-                    this.currentBuffer = 
this.response.Current.ArrowRecordBatch.SerializedRecordBatch.Memory;
-                    this.position = 0;
-                    remaining = this.currentBuffer.Length - this.position;
+                    return null;
                 }
 
-                int bytes = Math.Min(remaining, count);
-                this.currentBuffer.Slice(this.position, bytes).CopyTo(new 
Memory<byte>(buffer, offset, bytes));
-                this.position += bytes;
-                return bytes;
-            }
-
-            public override Task<int> ReadAsync(byte[] buffer, int offset, int 
count, CancellationToken cancellationToken)
-            {
-                return base.ReadAsync(buffer, offset, count, 
cancellationToken);
-            }
-
-            public override int ReadByte()
-            {
-                return base.ReadByte();
-            }
-
-            public override long Seek(long offset, SeekOrigin origin)
-            {
-                throw new NotSupportedException();
+                return 
ArrowSerializationHelpers.DeserializeRecordBatch(this.schema, 
this.response.Current.ArrowRecordBatch.SerializedRecordBatch.Memory);
             }
 
-            public override void SetLength(long value)
-            {
-                throw new NotSupportedException();
-            }
-
-            public override void Write(byte[] buffer, int offset, int count)
+            public void Dispose()
             {
-                throw new NotSupportedException();
+                if (!this.disposed)
+                {
+                    this.response.DisposeAsync().GetAwaiter().GetResult();
+                    this.disposed = true;
+                }
             }
         }
     }
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs 
b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
index a0ff9797c..be87a599a 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
@@ -19,7 +19,6 @@ using System;
 using System.Collections.Generic;
 using System.Threading;
 using System.Threading.Tasks;
-using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
 using Apache.Arrow.Adbc.Tracing;
 using Apache.Arrow.Ipc;
@@ -125,8 +124,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
                     dataToUse = Lz4Utilities.DecompressLz4(batch.Batch);
                 }
 
-                // Always use ChunkStream which ensures proper schema handling
-                this.reader = new ArrowStreamReader(new 
ChunkStream(this.schema, dataToUse));
+                this.reader = new 
SingleBatch(ArrowSerializationHelpers.DeserializeRecordBatch(this.schema, 
dataToUse));
             }
             catch (Exception ex)
             {
@@ -140,5 +138,19 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
             }
             this.index++;
         }
+
+        sealed class SingleBatch : IArrowReader
+        {
+            private RecordBatch? _recordBatch;
+
+            public SingleBatch(RecordBatch recordBatch) => _recordBatch = 
recordBatch;
+
+            public ValueTask<RecordBatch?> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+            {
+                RecordBatch? result = _recordBatch;
+                _recordBatch = null;
+                return new ValueTask<RecordBatch?>(result);
+            }
+        }
     }
 }
diff --git a/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs 
b/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs
index 17fa72b5a..a24a3e3a9 100644
--- a/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs
+++ b/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs
@@ -34,27 +34,6 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery
 {
     public class BigQueryStatementTests
     {
-        [Fact]
-        public void ReadChunkWithRetries_CalledMoreThanOnce()
-        {
-            TokenProtectedReadClientManger clientMgr = 
GetMockTokenProtectedReadClientManger();
-            var mockReadRowsStream = GetMockReadRowsStream(clientMgr);
-            mockReadRowsStream
-                .Setup(s => s.GetResponseStream())
-                .Throws(new InvalidOperationException("GetAsyncEnumerator can 
only be called once for a gRPC response stream wrapper."));
-
-            var statement = CreateBigQueryStatementForTest();
-            SetupRetryValues(statement);
-
-            // this should remain an issue because it indicates we aren't 
doing something correctly
-            // due to the setup, it looks like:
-            //----System.Reflection.TargetInvocationException : Exception has 
been thrown by the target of an invocation.
-            //--------Apache.Arrow.Adbc.AdbcException : Cannot 
execute<ReadChunkWithRetries>b__0 after 5 tries.Last exception: 
InvalidOperationException: GetAsyncEnumerator can only be called once for a 
gRPC response stream wrapper.
-            //------------ System.InvalidOperationException : 
GetAsyncEnumerator can only be called once for a gRPC response stream wrapper.
-
-            Assert.Throws<TargetInvocationException>(() => { 
statement.ReadChunkWithRetriesForTest(clientMgr, "test-stream", null); });
-        }
-
         [Theory]
         [InlineData(true)]  //.MoveNextAsync throws the error
         [InlineData(false)] //.Current throws the error
diff --git a/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs 
b/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs
index 31bd2e87d..f0d79276c 100644
--- a/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs
@@ -58,7 +58,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
         /// </summary>
         [Theory]
         [MemberData(nameof(TestCases))]
-        private async Task TestRealDatabricksCloudFetch(string query, int 
rowCount, bool useCloudFetch, bool enableDirectResults)
+        public async Task TestRealDatabricksCloudFetch(string query, int 
rowCount, bool useCloudFetch, bool enableDirectResults)
         {
             var connection = NewConnection(TestConfiguration, new 
Dictionary<string, string>
             {

Reply via email to