This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 286bb8505 fix(csharp/src/Drivers/Databricks): 
DatabricksCompositeReader unit tests (#3265)
286bb8505 is described below

commit 286bb8505973ebe935bb6a83007f80b5135d5ad1
Author: Todd Meng <[email protected]>
AuthorDate: Thu Aug 14 20:15:23 2025 -0700

    fix(csharp/src/Drivers/Databricks): DatabricksCompositeReader unit tests 
(#3265)
    
    Adds unit tests for DatabricksCompositeReader
    
    Includes small changes to the concrete classes for testability
---
 .../src/Drivers/Databricks/DatabricksConnection.cs |   3 +-
 .../Databricks/Reader/DatabricksCompositeReader.cs |  64 ++-
 .../Unit/DatabricksCompositeReaderUnitTests.cs     | 455 +++++++++++++++++++++
 3 files changed, 502 insertions(+), 20 deletions(-)

diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs 
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index feaa42dfe..2739197e8 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -445,7 +445,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                 isLz4Compressed = metadataResp.Lz4Compressed;
             }
 
-            return new DatabricksCompositeReader(databricksStatement, schema, 
response, isLz4Compressed, TlsOptions, _proxyConfigurator);
+            HttpClient httpClient = new 
HttpClient(HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions, 
_proxyConfigurator));
+            return new DatabricksCompositeReader(databricksStatement, schema, 
response, isLz4Compressed, httpClient);
         }
 
         internal override SchemaParser SchemaParser => new 
DatabricksSchemaParser();
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs 
b/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
index a2218d163..4b7c1dbeb 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
@@ -28,9 +28,10 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
 {
     /// <summary>
     /// A composite reader for Databricks that delegates to either 
CloudFetchReader or DatabricksReader
-    /// based on CloudFetch configuration and result set characteristics.
+    /// based on CloudFetch configuration and result set characteristics. This 
was introduced because some
+    /// older DBR do not accurately report the result set characteristics in 
the MetadataResponse
     /// </summary>
-    internal sealed class DatabricksCompositeReader : TracingReader
+    internal class DatabricksCompositeReader : TracingReader
     {
         public override string AssemblyName => 
DatabricksConnection.s_assemblyName;
 
@@ -43,11 +44,10 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
         private readonly Schema _schema;
         private readonly IResponse _response;
         private readonly bool _isLz4Compressed;
-        private readonly TlsProperties _tlsOptions;
-        private readonly HiveServer2ProxyConfigurator _proxyConfigurator;
 
         private IOperationStatusPoller? operationStatusPoller;
         private bool _disposed;
+        private readonly HttpClient _httpClient;
 
         /// <summary>
         /// Initializes a new instance of the <see 
cref="DatabricksCompositeReader"/> class.
@@ -61,16 +61,15 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
             Schema schema,
             IResponse response,
             bool isLz4Compressed,
-            TlsProperties tlsOptions,
-            HiveServer2ProxyConfigurator proxyConfigurator)
+            HttpClient httpClient,
+            IOperationStatusPoller? operationPoller = null)
             : base(statement)
         {
             _statement = statement ?? throw new 
ArgumentNullException(nameof(statement));
             _schema = schema ?? throw new 
ArgumentNullException(nameof(schema));
             _response = response;
             _isLz4Compressed = isLz4Compressed;
-            _tlsOptions = tlsOptions;
-            _proxyConfigurator = proxyConfigurator;
+            _httpClient = httpClient ?? throw new 
ArgumentNullException(nameof(httpClient));
 
             // use direct results if available
             if (_statement.TryGetDirectResults(_response, out 
TSparkDirectResults? directResults)
@@ -81,24 +80,32 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
             }
             if (_response.DirectResults?.ResultSet?.HasMoreRows ?? true)
             {
-                operationStatusPoller = new 
DatabricksOperationStatusPoller(statement, _response);
+                operationStatusPoller = operationPoller ?? new 
DatabricksOperationStatusPoller(_statement, response);
                 operationStatusPoller.Start();
             }
         }
 
+        /// <summary>
+        /// Determines whether CloudFetch should be used based on the fetch 
results.
+        /// </summary>
+        /// <param name="initialResults">The initial fetch results.</param>
+        /// <returns>True if CloudFetch should be used, false 
otherwise.</returns>
+        internal static bool ShouldUseCloudFetch(TFetchResultsResp 
initialResults)
+        {
+            return initialResults.__isset.results &&
+                   initialResults.Results.__isset.resultLinks &&
+                   initialResults.Results.ResultLinks?.Count > 0;
+        }
+
         private BaseDatabricksReader DetermineReader(TFetchResultsResp 
initialResults)
         {
-            // if it has links, use cloud fetch
-            if (initialResults.__isset.results &&
-                initialResults.Results.__isset.resultLinks &&
-                initialResults.Results.ResultLinks?.Count > 0)
+            if (ShouldUseCloudFetch(initialResults))
             {
-                HttpClient cloudFetchHttpClient = new 
HttpClient(HiveServer2TlsImpl.NewHttpClientHandler(_tlsOptions, 
_proxyConfigurator));
-                return new CloudFetchReader(_statement, _schema, _response, 
initialResults, _isLz4Compressed, cloudFetchHttpClient);
+                return CreateCloudFetchReader(initialResults);
             }
             else
             {
-                return new DatabricksReader(_statement, _schema, _response, 
initialResults, _isLz4Compressed);
+                return CreateDatabricksReader(initialResults);
             }
         }
 
@@ -116,13 +123,33 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
                 // Make a FetchResults call to get the initial result set
                 // and determine the reader based on the result set
                 TFetchResultsReq request = new 
TFetchResultsReq(_response.OperationHandle!, TFetchOrientation.FETCH_NEXT, 
this._statement.BatchSize);
-                TFetchResultsResp response = await 
this._statement.Connection.Client!.FetchResults(request, cancellationToken);
+                TFetchResultsResp response = await 
this._statement.Client!.FetchResults(request, cancellationToken);
                 _activeReader = DetermineReader(response);
             }
 
             return await 
_activeReader.ReadNextRecordBatchAsync(cancellationToken);
         }
 
+                /// <summary>
+        /// Creates a CloudFetchReader instance. Virtual to allow testing.
+        /// </summary>
+        /// <param name="initialResults">The initial fetch results.</param>
+        /// <returns>A new CloudFetchReader instance.</returns>
+        protected virtual BaseDatabricksReader 
CreateCloudFetchReader(TFetchResultsResp initialResults)
+        {
+            return new CloudFetchReader(_statement, _schema, _response, 
initialResults, _isLz4Compressed, _httpClient);
+        }
+
+        /// <summary>
+        /// Creates a DatabricksReader instance. Virtual to allow testing.
+        /// </summary>
+        /// <param name="initialResults">The initial fetch results.</param>
+        /// <returns>A new DatabricksReader instance.</returns>
+        protected virtual BaseDatabricksReader 
CreateDatabricksReader(TFetchResultsResp initialResults)
+        {
+            return new DatabricksReader(_statement, _schema, _response, 
initialResults, _isLz4Compressed);
+        }
+
         public override async ValueTask<RecordBatch?> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
         {
             var result = await 
ReadNextRecordBatchInternalAsync(cancellationToken);
@@ -151,8 +178,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
                         else
                         {
                             // Note: Have the contained reader close the 
operation to avoid duplicate calls.
-                            _ = _activeReader.CloseOperationAsync()
-                                
.ConfigureAwait(false).GetAwaiter().GetResult();
+                            _activeReader.Dispose();
                             _activeReader = null;
                         }
                     }
diff --git 
a/csharp/test/Drivers/Databricks/Unit/DatabricksCompositeReaderUnitTests.cs 
b/csharp/test/Drivers/Databricks/Unit/DatabricksCompositeReaderUnitTests.cs
new file mode 100644
index 000000000..198a0d529
--- /dev/null
+++ b/csharp/test/Drivers/Databricks/Unit/DatabricksCompositeReaderUnitTests.cs
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
+using Apache.Arrow.Adbc.Drivers.Databricks;
+using Apache.Arrow.Adbc.Drivers.Databricks.Reader;
+using Apache.Arrow.Types;
+using Apache.Hive.Service.Rpc.Thrift;
+using Moq;
+using Moq.Protected;
+using Xunit;
+
+namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit
+{
+    /// <summary>
+    /// Testable subclass of DatabricksCompositeReader for unit testing
+    /// </summary>
+    internal class TestableDatabricksCompositeReader : 
DatabricksCompositeReader
+    {
+        public bool CloudFetchReaderCreated { get; private set; }
+        public bool DatabricksReaderCreated { get; private set; }
+        public BaseDatabricksReader? MockReader { get; set; }
+
+        public TestableDatabricksCompositeReader(
+            IHiveServer2Statement statement,
+            Schema schema,
+            IResponse response,
+            bool isLz4Compressed,
+            HttpClient httpClient,
+            IOperationStatusPoller? operationPoller = null)
+            : base(statement, schema, response, isLz4Compressed, httpClient, 
operationPoller)
+        {
+        }
+
+        protected override BaseDatabricksReader 
CreateCloudFetchReader(TFetchResultsResp initialResults)
+        {
+            CloudFetchReaderCreated = true;
+            return MockReader!;
+        }
+
+        protected override BaseDatabricksReader 
CreateDatabricksReader(TFetchResultsResp initialResults)
+        {
+            DatabricksReaderCreated = true;
+            return MockReader!;
+        }
+    }
+
+    public class DatabricksCompositeReaderTests : IDisposable
+    {
+        private readonly Schema _testSchema;
+        private readonly HttpClient _httpClient;
+
+        public DatabricksCompositeReaderTests()
+        {
+            _testSchema = new Schema.Builder()
+                .Field(new Field("id", Int32Type.Default, true))
+                .Field(new Field("name", StringType.Default, true))
+                .Build();
+
+            _httpClient = new HttpClient();
+        }
+
+        private Mock<IHiveServer2Statement> 
CreateMockStatement(Mock<TCLIService.IAsync>? mockClient = null, 
TSparkDirectResults? directResults = null)
+        {
+            var mockStatement = new Mock<IHiveServer2Statement>();
+            mockStatement.Setup(s => s.QueryTimeoutSeconds).Returns(10);
+
+            if (mockClient != null)
+            {
+                mockStatement.Setup(s => s.Client).Returns(mockClient.Object);
+            }
+
+            if (directResults != null)
+            {
+                mockStatement
+                    .Setup(s => s.TryGetDirectResults(It.IsAny<IResponse>(), 
out It.Ref<TSparkDirectResults?>.IsAny))
+                    .Returns((IResponse response, out TSparkDirectResults? 
result) =>
+                    {
+                        result = directResults;
+                        return true;
+                    });
+            }
+            else
+            {
+                TSparkDirectResults? nullDirectResults = null;
+                mockStatement.Setup(s => 
s.TryGetDirectResults(It.IsAny<IResponse>(), out nullDirectResults))
+                    .Returns(false);
+            }
+
+            return mockStatement;
+        }
+
+        private Mock<TCLIService.IAsync> CreateMockClient()
+        {
+            var mockClient = new Mock<TCLIService.IAsync>();
+            var closeOperationResponse = new TCloseOperationResp
+            {
+                Status = new TStatus { StatusCode = TStatusCode.SUCCESS_STATUS 
}
+            };
+            mockClient.Setup(c => 
c.CloseOperation(It.IsAny<TCloseOperationReq>(), It.IsAny<CancellationToken>()))
+                .ReturnsAsync(closeOperationResponse);
+            return mockClient;
+        }
+
+        private Mock<IResponse> CreateMockResponse(TSparkDirectResults? 
directResults = null, TOperationHandle? operationHandle = null)
+        {
+            var mockResponse = new Mock<IResponse>();
+            mockResponse.Setup(r => r.OperationHandle).Returns(operationHandle 
?? new TOperationHandle());
+
+            if (directResults != null)
+            {
+                mockResponse.Setup(r => 
r.DirectResults).Returns(directResults);
+            }
+
+            return mockResponse;
+        }
+
+        [Fact]
+        public void Constructor_WithValidParameters_InitializesSuccessfully()
+        {
+            // Arrange
+            var mockClient = CreateMockClient();
+            var mockStatement = CreateMockStatement(mockClient);
+            var mockPoller = new Mock<IOperationStatusPoller>();
+            var mockResponse = CreateMockResponse();
+
+            // Act
+            using var reader = new DatabricksCompositeReader(
+                mockStatement.Object,
+                _testSchema,
+                mockResponse.Object,
+                false,
+                _httpClient,
+                mockPoller.Object);
+
+            // Assert
+            Assert.NotNull(reader);
+            Assert.Equal(_testSchema, reader.Schema);
+            mockPoller.Verify(p => p.Start(It.IsAny<CancellationToken>()), 
Times.Once);
+        }
+
+        [Fact]
+        public void 
Constructor_WithDirectResultsAndNoMoreRows_DoesNotStartPoller()
+        {
+            // Arrange
+            var directResults = new TSparkDirectResults
+            {
+                ResultSet = new TFetchResultsResp
+                {
+                    HasMoreRows = false,
+                    Results = new TRowSet()
+                },
+                ResultSetMetadata = new TGetResultSetMetadataResp
+                {},
+                __isset = new TSparkDirectResults.Isset { resultSet = true }
+            };
+
+            var mockClient = CreateMockClient();
+            var mockStatement = CreateMockStatement(mockClient, directResults: 
directResults);
+            var mockResponse = CreateMockResponse(directResults: 
directResults);
+            var mockPoller = new Mock<IOperationStatusPoller>();
+
+            // Act
+            using var reader = new TestableDatabricksCompositeReader(
+                mockStatement.Object,
+                _testSchema,
+                mockResponse.Object,
+                false,
+                _httpClient,
+                mockPoller.Object);
+
+            // Assert
+            mockPoller.Verify(p => p.Start(It.IsAny<CancellationToken>()), 
Times.Never);
+        }
+
+        [Fact]
+        public void ShouldUseCloudFetch_WithResultLinks_ReturnsTrue()
+        {
+            // Arrange
+            var fetchResults = new TFetchResultsResp
+            {
+                Results = new TRowSet
+                {
+                    ResultLinks = new List<TSparkArrowResultLink>
+                    {
+                        new TSparkArrowResultLink {}
+                    }
+                }
+            };
+
+            // Act
+            var result = 
DatabricksCompositeReader.ShouldUseCloudFetch(fetchResults);
+
+            // Assert
+            Assert.True(result);
+        }
+
+        [Fact]
+        public void ShouldUseCloudFetch_WithoutResultLinks_ReturnsFalse()
+        {
+            // Arrange
+            var fetchResults = new TFetchResultsResp
+            {
+                Results = new TRowSet()
+            };
+
+            // Act
+            var result = 
DatabricksCompositeReader.ShouldUseCloudFetch(fetchResults);
+
+            // Assert
+            Assert.False(result);
+        }
+
+        [Fact]
+        public void ShouldUseCloudFetch_WithEmptyResultLinks_ReturnsFalse()
+        {
+            // Arrange
+            var fetchResults = new TFetchResultsResp
+            {
+                Results = new TRowSet
+                {
+                    ResultLinks = new List<TSparkArrowResultLink> {}
+                }
+            };
+
+            // Act
+            var result = 
DatabricksCompositeReader.ShouldUseCloudFetch(fetchResults);
+
+            // Assert
+            Assert.False(result);
+        }
+
+        [Fact]
+        public async Task 
ReadNextRecordBatchAsync_WithNoActiveReader_FetchesAndDelegates()
+        {
+            // Arrange
+            var mockClient = CreateMockClient();
+            var fetchResponse = new TFetchResultsResp
+            {
+                HasMoreRows = false,
+                Results = new TRowSet()
+            };
+            mockClient.Setup(c => c.FetchResults(It.IsAny<TFetchResultsReq>(), 
It.IsAny<CancellationToken>()))
+                .ReturnsAsync(fetchResponse);
+            var mockStatement = CreateMockStatement(mockClient);
+            var mockPoller = new Mock<IOperationStatusPoller>();
+            var mockResponse = CreateMockResponse();
+            var expectedBatch = new RecordBatch(_testSchema, new IArrowArray[] 
{ }, 0);
+            var mockReader = new 
Mock<BaseDatabricksReader>(mockStatement.Object, _testSchema, 
mockResponse.Object, false);
+            mockReader.Setup(r => 
r.ReadNextRecordBatchAsync(It.IsAny<CancellationToken>()))
+                .ReturnsAsync(expectedBatch);
+
+            using var reader = new TestableDatabricksCompositeReader(
+                mockStatement.Object,
+                _testSchema,
+                mockResponse.Object,
+                false,
+                _httpClient,
+                mockPoller.Object)
+            {
+                MockReader = mockReader.Object
+            };
+
+            // Act
+            var result = await reader.ReadNextRecordBatchAsync();
+
+            // Assert
+            Assert.NotNull(result);
+            Assert.Equal(expectedBatch, result);
+            mockClient.Verify(c => c.FetchResults(
+                It.IsAny<TFetchResultsReq>(),
+                It.IsAny<CancellationToken>()), Times.Once);
+            // verify that the reader was created
+            Assert.True(reader.DatabricksReaderCreated);
+        }
+
+        [Fact]
+        public async Task 
ReadNextRecordBatchAsync_WhenReturnsNull_StopsPoller()
+        {
+            // Arrange
+            var mockClient = CreateMockClient();
+            var fetchResponse = new TFetchResultsResp
+            {
+                HasMoreRows = false,
+                Results = new TRowSet()
+            };
+            mockClient.Setup(c => c.FetchResults(It.IsAny<TFetchResultsReq>(), 
It.IsAny<CancellationToken>()))
+                .ReturnsAsync(fetchResponse);
+            var mockStatement = CreateMockStatement(mockClient);
+            var mockPoller = new Mock<IOperationStatusPoller>();
+            mockPoller.Setup(p => p.IsStarted).Returns(true);
+            var mockResponse = CreateMockResponse();
+            var mockReader = new 
Mock<BaseDatabricksReader>(mockStatement.Object, _testSchema, 
mockResponse.Object, false);
+            mockReader.Setup(r => 
r.ReadNextRecordBatchAsync(It.IsAny<CancellationToken>()))
+                .ReturnsAsync((RecordBatch?)null);
+
+            using var reader = new TestableDatabricksCompositeReader(
+                mockStatement.Object,
+                _testSchema,
+                mockResponse.Object,
+                false,
+                _httpClient,
+                mockPoller.Object)
+            {
+                MockReader = mockReader.Object
+            };
+
+            // Act
+            var result = await reader.ReadNextRecordBatchAsync();
+
+            // Assert
+            Assert.Null(result);
+            mockPoller.Verify(p => p.Stop(), Times.Once);
+        }
+
+        [Fact]
+        public void Constructor_WithDirectResultsAndMoreRows_StartsPoller()
+        {
+            // Arrange
+            var directResults = new TSparkDirectResults
+            {
+                ResultSet = new TFetchResultsResp
+                {
+                    HasMoreRows = true,
+                    Results = new TRowSet()
+                },
+                __isset = new TSparkDirectResults.Isset { resultSet = true }
+            };
+
+            var mockClient = CreateMockClient();
+            var mockStatement = CreateMockStatement(mockClient, directResults: 
directResults);
+            var mockResponse = CreateMockResponse(directResults: 
directResults);
+            var mockPoller = new Mock<IOperationStatusPoller>();
+            var mockReader = new 
Mock<BaseDatabricksReader>(mockStatement.Object, _testSchema, 
mockResponse.Object, false);
+
+            // Act
+            using var reader = new TestableDatabricksCompositeReader(
+                mockStatement.Object,
+                _testSchema,
+                mockResponse.Object,
+                false,
+                _httpClient,
+                mockPoller.Object)
+            {
+                MockReader = mockReader.Object
+            };
+
+            // Assert
+            Assert.True(reader.DatabricksReaderCreated); // Should create 
reader from direct results
+            mockPoller.Verify(p => p.Start(It.IsAny<CancellationToken>()), 
Times.Once);
+        }
+
+        [Fact]
+        public void Dispose_WithNoActiveReader_CallsCloseOperationDirectly()
+        {
+            // Arrange
+            var mockClient = CreateMockClient();
+            var mockStatement = CreateMockStatement(mockClient);
+
+            var mockPoller = new Mock<IOperationStatusPoller>();
+            var operationHandle = new TOperationHandle();
+            var mockResponse = CreateMockResponse(directResults: null, 
operationHandle: operationHandle);
+
+            var reader = new DatabricksCompositeReader(
+                mockStatement.Object,
+                _testSchema,
+                mockResponse.Object,
+                false,
+                _httpClient,
+                mockPoller.Object);
+
+            // Act
+            reader.Dispose();
+
+            // Assert
+            mockClient.Verify(c => c.CloseOperation(
+                It.Is<TCloseOperationReq>(req => req.OperationHandle == 
operationHandle),
+                It.IsAny<CancellationToken>()), Times.Once);
+            mockPoller.Verify(p => p.Stop(), Times.Once);
+            mockPoller.Verify(p => p.Dispose(), Times.Once);
+        }
+
+        [Fact]
+        public async Task Dispose_WithActiveReader_CallsReaderDispose()
+        {
+            // Arrange
+            var mockClient = CreateMockClient();
+            var fetchResponse = new TFetchResultsResp
+            {
+                HasMoreRows = false,
+                Results = new TRowSet()
+            };
+            mockClient.Setup(c => c.FetchResults(It.IsAny<TFetchResultsReq>(), 
It.IsAny<CancellationToken>()))
+                .ReturnsAsync(fetchResponse);
+            var mockStatement = CreateMockStatement(mockClient);
+
+            var mockPoller = new Mock<IOperationStatusPoller>();
+            var mockResponse = CreateMockResponse();
+            var mockReader = new 
Mock<BaseDatabricksReader>(mockStatement.Object, _testSchema, 
mockResponse.Object, false);
+            mockReader.Setup(r => 
r.ReadNextRecordBatchAsync(It.IsAny<CancellationToken>()))
+                .ReturnsAsync((RecordBatch?)null);
+
+            mockReader.Protected()
+                .Setup("Dispose", ItExpr.IsAny<bool>())
+                .CallBase()
+                .Verifiable();
+
+            var reader = new TestableDatabricksCompositeReader(
+                mockStatement.Object,
+                _testSchema,
+                mockResponse.Object,
+                false,
+                _httpClient,
+                mockPoller.Object)
+            {
+                MockReader = mockReader.Object
+            };
+
+            // Trigger creation of active reader
+            _ = await reader.ReadNextRecordBatchAsync();
+
+            // Act
+            reader.Dispose();
+
+            mockReader.Protected().Verify("Dispose", Times.Once(), 
ItExpr.IsAny<bool>());
+            mockPoller.Verify(p => p.Stop(), Times.Once);
+            mockPoller.Verify(p => p.Dispose(), Times.Once);
+        }
+
+        public void Dispose()
+        {
+            _httpClient?.Dispose();
+        }
+    }
+}

Reply via email to