This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d78ff7a1 feat(csharp/src/Drivers/Databricks): Poll status to keep 
query alive (#2820)
4d78ff7a1 is described below

commit 4d78ff7a1ca2132478de5129323f159caada173b
Author: Todd Meng <[email protected]>
AuthorDate: Mon May 19 09:55:42 2025 -0700

    feat(csharp/src/Drivers/Databricks): Poll status to keep query alive (#2820)
    
    Uses `GetOperationStatus` Polling to guarantee that query/command
    results stays alive after query execution has completed.
    DatabricksReader and CloudFetchReaders start async loops that send Pings
    every minute, until the results have been completely consumed or the
    connection closes.
    
    Includes Unit tests + an e2e testing for query stays alive more than 20
    mins.
---
 csharp/src/Drivers/Apache/AssemblyInfo.cs          |   1 +
 csharp/src/Drivers/Databricks/AssemblyInfo.cs      |   1 +
 .../CloudFetch/CloudFetchDownloadManager.cs        |  26 +++++-
 .../Databricks/DatabricksOperationStatusPoller.cs  | 100 +++++++++++++++++++++
 .../src/Drivers/Databricks/DatabricksParameters.cs |   5 ++
 csharp/src/Drivers/Databricks/DatabricksReader.cs  |  18 ++++
 csharp/test/Drivers/Apache/AssemblyInfo.cs         |   1 +
 .../DatabricksOperationStatusPollerTests.cs        |  95 ++++++++++++++++++++
 csharp/test/Drivers/Databricks/StatementTests.cs   |  53 +++++++++++
 9 files changed, 299 insertions(+), 1 deletion(-)

diff --git a/csharp/src/Drivers/Apache/AssemblyInfo.cs 
b/csharp/src/Drivers/Apache/AssemblyInfo.cs
index fd70665a9..2654e928b 100644
--- a/csharp/src/Drivers/Apache/AssemblyInfo.cs
+++ b/csharp/src/Drivers/Apache/AssemblyInfo.cs
@@ -19,3 +19,4 @@ using System.Runtime.CompilerServices;
 
 [assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Drivers.Databricks, 
PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
 [assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Drivers.Databricks, 
PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, 
PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")]
diff --git a/csharp/src/Drivers/Databricks/AssemblyInfo.cs 
b/csharp/src/Drivers/Databricks/AssemblyInfo.cs
index 59e9e75de..abd9017b7 100644
--- a/csharp/src/Drivers/Databricks/AssemblyInfo.cs
+++ b/csharp/src/Drivers/Databricks/AssemblyInfo.cs
@@ -18,3 +18,4 @@
 using System.Runtime.CompilerServices;
 
 [assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Drivers.Databricks, 
PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, 
PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")]
diff --git 
a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs 
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
index 99f38fe82..7eec4d768 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
@@ -37,6 +37,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
         private const int DefaultMemoryBufferSizeMB = 200;
         private const bool DefaultPrefetchEnabled = true;
         private const int DefaultFetchBatchSize = 2000000;
+        private const int DefaultTimeoutMinutes = 5;
 
         private readonly DatabricksStatement _statement;
         private readonly Schema _schema;
@@ -50,6 +51,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
         private bool _isDisposed;
         private bool _isStarted;
         private CancellationTokenSource? _cancellationTokenSource;
+        private DatabricksOperationStatusPoller? _operationStatusPoller;
 
         /// <summary>
         /// Initializes a new instance of the <see 
cref="CloudFetchDownloadManager"/> class.
@@ -137,7 +139,7 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
             }
 
             // Parse timeout minutes
-            int timeoutMinutes = 5;
+            int timeoutMinutes = DefaultTimeoutMinutes;
             if 
(connectionProps.TryGetValue(DatabricksParameters.CloudFetchTimeoutMinutes, out 
string? timeoutStr))
             {
                 if (int.TryParse(timeoutStr, out int parsedTimeout) && 
parsedTimeout > 0)
@@ -180,6 +182,9 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
                 _isLz4Compressed,
                 maxRetries,
                 retryDelayMs);
+
+            // Initialize the operation status poller
+            _operationStatusPoller = new 
DatabricksOperationStatusPoller(_statement);
         }
 
         /// <summary>
@@ -247,6 +252,9 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
             // Create a new cancellation token source
             _cancellationTokenSource = new CancellationTokenSource();
 
+            // Start the operation status poller
+            _operationStatusPoller?.Start(_cancellationTokenSource.Token);
+
             // Start the result fetcher
             await 
_resultFetcher.StartAsync(_cancellationTokenSource.Token).ConfigureAwait(false);
 
@@ -267,6 +275,9 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
             // Cancel the token to signal all operations to stop
             _cancellationTokenSource?.Cancel();
 
+            // Stop the operation status poller
+            DisposeOperationStatusPoller();
+
             // Stop the downloader
             await _downloader.StopAsync().ConfigureAwait(false);
 
@@ -291,6 +302,9 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
             // Stop the pipeline
             StopAsync().GetAwaiter().GetResult();
 
+            // Dispose the operation status poller
+            DisposeOperationStatusPoller();
+
             // Dispose the HTTP client
             _httpClient.Dispose();
 
@@ -326,5 +340,15 @@ namespace 
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
                 throw new 
ObjectDisposedException(nameof(CloudFetchDownloadManager));
             }
         }
+
+
+        private void DisposeOperationStatusPoller()
+        {
+            if (_operationStatusPoller != null)
+            {
+                _operationStatusPoller.Dispose();
+                _operationStatusPoller = null;
+            }
+        }
     }
 }
diff --git a/csharp/src/Drivers/Databricks/DatabricksOperationStatusPoller.cs 
b/csharp/src/Drivers/Databricks/DatabricksOperationStatusPoller.cs
new file mode 100644
index 000000000..edb7cd058
--- /dev/null
+++ b/csharp/src/Drivers/Databricks/DatabricksOperationStatusPoller.cs
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks
+{
+    /// <summary>
+    /// Service that periodically polls the operation status of a Databricks 
warehouse query to keep it alive.
+    /// This is used to maintain the command results and session when reading 
results takes a long time.
+    /// </summary>
+    internal class DatabricksOperationStatusPoller : IDisposable
+    {
+        private readonly IHiveServer2Statement _statement;
+        private readonly int _heartbeatIntervalSeconds;
+        // internal cancellation token source - won't affect the external token
+        private CancellationTokenSource? _internalCts;
+        private Task? _operationStatusPollingTask;
+
+        public DatabricksOperationStatusPoller(IHiveServer2Statement 
statement, int heartbeatIntervalSeconds = 
DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds)
+        {
+            _statement = statement ?? throw new 
ArgumentNullException(nameof(statement));
+            _heartbeatIntervalSeconds = heartbeatIntervalSeconds;
+        }
+
+        public bool IsStarted => _operationStatusPollingTask != null;
+
+        /// <summary>
+        /// Starts the operation status poller. Continues polling every minute 
until the operation is canceled/errored
+        /// the token is canceled, or the operation status poller is disposed.
+        /// </summary>
+        /// <param name="externalToken">The external cancellation 
token.</param>
+        public void Start(CancellationToken externalToken = default)
+        {
+            if (IsStarted)
+            {
+                throw new InvalidOperationException("Operation status poller 
already started");
+            }
+            _internalCts = new CancellationTokenSource();
+            // create a linked token to the external token so that the 
external token can cancel the operation status polling task if needed
+            var linkedToken = 
CancellationTokenSource.CreateLinkedTokenSource(_internalCts.Token, 
externalToken).Token;
+            _operationStatusPollingTask = Task.Run(() => 
PollOperationStatus(linkedToken));
+        }
+
+        private async Task PollOperationStatus(CancellationToken 
cancellationToken)
+        {
+            try
+            {
+                while (!cancellationToken.IsCancellationRequested)
+                {
+                    var operationHandle = _statement.OperationHandle;
+                    if (operationHandle == null) break;
+
+                    var request = new TGetOperationStatusReq(operationHandle);
+                    var response = await 
_statement.Client.GetOperationStatus(request, cancellationToken);
+                    await 
Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken);
+
+                    // end the heartbeat if the command has terminated
+                    if (response.OperationState == 
TOperationState.CANCELED_STATE ||
+                        response.OperationState == TOperationState.ERROR_STATE)
+                    {
+                        break;
+                    }
+                }
+            }
+            catch (TaskCanceledException)
+            {
+                // ignore
+            }
+        }
+
+        public void Dispose()
+        {
+            if (_internalCts != null)
+            {
+                _internalCts.Cancel();
+                _operationStatusPollingTask?.Wait();
+                _internalCts.Dispose();
+            }
+        }
+    }
+}
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs 
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index 8c6122473..c5155680d 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -155,6 +155,11 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
     /// </summary>
     public class DatabricksConstants
     {
+        /// <summary>
+        /// Default heartbeat interval in seconds for long-running operations
+        /// </summary>
+        public const int DefaultOperationStatusPollingIntervalSeconds = 60;
+
         /// <summary>
         /// OAuth grant type constants
         /// </summary>
diff --git a/csharp/src/Drivers/Databricks/DatabricksReader.cs 
b/csharp/src/Drivers/Databricks/DatabricksReader.cs
index cdd131111..d82f11f62 100644
--- a/csharp/src/Drivers/Databricks/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksReader.cs
@@ -33,6 +33,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         int index;
         IArrowReader? reader;
         bool isLz4Compressed;
+        private DatabricksOperationStatusPoller? _operationStatusPoller;
 
         public DatabricksReader(DatabricksStatement statement, Schema schema, 
bool isLz4Compressed)
         {
@@ -48,14 +49,20 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                 if (!statement.DirectResults.ResultSet.HasMoreRows)
                 {
                     this.statement = null;
+                    return;
                 }
             }
+            _operationStatusPoller = new 
DatabricksOperationStatusPoller(statement);
         }
 
         public Schema Schema { get { return schema; } }
 
         public async ValueTask<RecordBatch?> 
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
         {
+            if (_operationStatusPoller != null && 
!_operationStatusPoller.IsStarted)
+            {
+                _operationStatusPoller.Start(cancellationToken);
+            }
             while (true)
             {
                 if (this.reader != null)
@@ -91,6 +98,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                 if (!response.HasMoreRows)
                 {
                     this.statement = null;
+                    DisposeOperationStatusPoller();
                 }
             }
         }
@@ -134,6 +142,16 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
 
         public void Dispose()
         {
+            DisposeOperationStatusPoller();
+        }
+
+        private void DisposeOperationStatusPoller()
+        {
+            if (_operationStatusPoller != null)
+            {
+                _operationStatusPoller.Dispose();
+                _operationStatusPoller = null;
+            }
         }
     }
 }
diff --git a/csharp/test/Drivers/Apache/AssemblyInfo.cs 
b/csharp/test/Drivers/Apache/AssemblyInfo.cs
index 7cf47f159..c2de91efc 100644
--- a/csharp/test/Drivers/Apache/AssemblyInfo.cs
+++ b/csharp/test/Drivers/Apache/AssemblyInfo.cs
@@ -33,3 +33,4 @@ using System.Runtime.InteropServices;
 // The following GUID is for the ID of the typelib if this project is exposed 
to COM.
 
 [assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Drivers.Databricks, 
PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, 
PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")]
diff --git 
a/csharp/test/Drivers/Databricks/DatabricksOperationStatusPollerTests.cs 
b/csharp/test/Drivers/Databricks/DatabricksOperationStatusPollerTests.cs
new file mode 100644
index 000000000..9d91ed590
--- /dev/null
+++ b/csharp/test/Drivers/Databricks/DatabricksOperationStatusPollerTests.cs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch;
+using Apache.Arrow.Adbc.Drivers.Databricks;
+using Apache.Hive.Service.Rpc.Thrift;
+using Moq;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
+{
+    public class DatabricksOperationStatusPollerTests
+    {
+        private readonly ITestOutputHelper _outputHelper;
+        private readonly Mock<IHiveServer2Statement> _mockStatement;
+        private readonly Mock<TCLIService.IAsync> _mockClient;
+        private readonly TOperationHandle _operationHandle;
+
+        private readonly int _heartbeatIntervalSeconds = 1000;
+
+        public DatabricksOperationStatusPollerTests(ITestOutputHelper 
outputHelper)
+        {
+            _outputHelper = outputHelper;
+            _mockClient = new Mock<TCLIService.IAsync>();
+            _mockStatement = new Mock<IHiveServer2Statement>();
+            _operationHandle = new TOperationHandle
+            {
+                OperationId = new THandleIdentifier { Guid = new byte[] { 1, 
2, 3, 4 } },
+                OperationType = TOperationType.EXECUTE_STATEMENT
+            };
+
+            _mockStatement.Setup(s => s.Client).Returns(_mockClient.Object);
+            _mockStatement.Setup(s => 
s.OperationHandle).Returns(_operationHandle);
+        }
+
+        [Fact]
+        public async Task StartPollsOperationStatusAtInterval()
+        {
+            // Arrange
+            var poller = new 
DatabricksOperationStatusPoller(_mockStatement.Object, 
_heartbeatIntervalSeconds);
+            var pollCount = 0;
+            _mockClient.Setup(c => 
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), 
It.IsAny<CancellationToken>()))
+                .ReturnsAsync(new TGetOperationStatusResp())
+                .Callback(() => pollCount++);
+
+            // Act
+            poller.Start();
+            await Task.Delay(_heartbeatIntervalSeconds * 2); // Wait for 2 
seconds to allow multiple polls
+
+            // Assert
+            Assert.True(pollCount > 0, "Should have polled at least once");
+            _mockClient.Verify(c => 
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), 
It.IsAny<CancellationToken>()), Times.AtLeastOnce);
+        }
+
+        [Fact]
+        public async Task DisposeStopsPolling()
+        {
+            // Arrange
+            var poller = new 
DatabricksOperationStatusPoller(_mockStatement.Object, 
_heartbeatIntervalSeconds);
+            var pollCount = 0;
+            _mockClient.Setup(c => 
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), 
It.IsAny<CancellationToken>()))
+                .ReturnsAsync(new TGetOperationStatusResp())
+                .Callback(() => pollCount++);
+
+            // Act
+            poller.Start();
+            await Task.Delay(_heartbeatIntervalSeconds * 2); // Let it poll 
for a bit
+            poller.Dispose();
+            await Task.Delay(_heartbeatIntervalSeconds * 2); // Wait to see if 
it continues polling
+
+            // Assert
+            int finalPollCount = pollCount;
+            await Task.Delay(_heartbeatIntervalSeconds * 2); // Wait another 
second
+            Assert.Equal(finalPollCount, pollCount); // Poll count should not 
increase after disposal
+        }
+    }
+}
diff --git a/csharp/test/Drivers/Databricks/StatementTests.cs 
b/csharp/test/Drivers/Databricks/StatementTests.cs
index 430d46164..c4e750955 100644
--- a/csharp/test/Drivers/Databricks/StatementTests.cs
+++ b/csharp/test/Drivers/Databricks/StatementTests.cs
@@ -433,5 +433,58 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
             sqlUpdate = $"CREATE TABLE IF NOT EXISTS {fullTableNameParent} 
(INDEX INT, NAME STRING, PRIMARY KEY (INDEX, NAME))";
             primaryKeys = ["index", "name"];
         }
+
+        // NOTE: this is a thirty minute test. As of writing, databricks 
commands have 20 minutes of idle time (and checked every 5 mintues)
+        [SkippableTheory]
+        [InlineData(true, "CloudFetch enabled")]
+        [InlineData(false, "CloudFetch disabled")]
+        public async Task StatusPollerKeepsQueryAlive(bool useCloudFetch, 
string configName)
+        {
+            OutputHelper?.WriteLine($"Testing status poller with long delay 
between reads ({configName})");
+
+            // Create a connection using the test configuration with a small 
batch size
+            var connectionParams = new Dictionary<string, string>
+            {
+                [DatabricksParameters.UseCloudFetch] = 
useCloudFetch.ToString().ToLower()
+            };
+            using AdbcConnection connection = NewConnection(TestConfiguration, 
connectionParams);
+            using var statement = connection.CreateStatement();
+
+            // Execute a query that should return data - using a larger 
dataset to ensure multiple batches
+            statement.SqlQuery = "SELECT id, CAST(id AS STRING) as id_string, 
id * 2 as id_doubled FROM RANGE(300000)";
+            QueryResult result = statement.ExecuteQuery();
+
+            Assert.NotNull(result.Stream);
+
+            // Read first batch
+            using var firstBatch = await 
result.Stream.ReadNextRecordBatchAsync();
+            Assert.NotNull(firstBatch);
+            int firstBatchRows = firstBatch.Length;
+            OutputHelper?.WriteLine($"First batch: Read {firstBatchRows} 
rows");
+
+            // Simulate a long delay (30 minutes)
+            OutputHelper?.WriteLine("Simulating 30 minute delay...");
+            await Task.Delay(TimeSpan.FromMinutes(30));
+
+            // Read remaining batches
+            int totalRows = firstBatchRows;
+            int batchCount = 1;
+
+            while (result.Stream != null)
+            {
+                using var batch = await 
result.Stream.ReadNextRecordBatchAsync();
+                if (batch == null)
+                    break;
+
+                batchCount++;
+                totalRows += batch.Length;
+                OutputHelper?.WriteLine($"Batch {batchCount}: Read 
{batch.Length} rows");
+            }
+
+            // Verify we got all rows
+            Assert.Equal(300000, totalRows);
+            Assert.True(batchCount > 1, "Should have read multiple batches");
+            OutputHelper?.WriteLine($"Successfully read {totalRows} rows in 
{batchCount} batches after 30 minute delay with {configName}");
+        }
     }
 }

Reply via email to