This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 4d78ff7a1 feat(csharp/src/Drivers/Databricks): Poll status to keep
query alive (#2820)
4d78ff7a1 is described below
commit 4d78ff7a1ca2132478de5129323f159caada173b
Author: Todd Meng <[email protected]>
AuthorDate: Mon May 19 09:55:42 2025 -0700
feat(csharp/src/Drivers/Databricks): Poll status to keep query alive (#2820)
Uses `GetOperationStatus` Polling to guarantee that query/command
results stays alive after query execution has completed.
DatabricksReader and CloudFetchReaders start async loops that send Pings
every minute, until the results have been completely consumed or the
connection closes.
Includes Unit tests + an e2e testing for query stays alive more than 20
mins.
---
csharp/src/Drivers/Apache/AssemblyInfo.cs | 1 +
csharp/src/Drivers/Databricks/AssemblyInfo.cs | 1 +
.../CloudFetch/CloudFetchDownloadManager.cs | 26 +++++-
.../Databricks/DatabricksOperationStatusPoller.cs | 100 +++++++++++++++++++++
.../src/Drivers/Databricks/DatabricksParameters.cs | 5 ++
csharp/src/Drivers/Databricks/DatabricksReader.cs | 18 ++++
csharp/test/Drivers/Apache/AssemblyInfo.cs | 1 +
.../DatabricksOperationStatusPollerTests.cs | 95 ++++++++++++++++++++
csharp/test/Drivers/Databricks/StatementTests.cs | 53 +++++++++++
9 files changed, 299 insertions(+), 1 deletion(-)
diff --git a/csharp/src/Drivers/Apache/AssemblyInfo.cs
b/csharp/src/Drivers/Apache/AssemblyInfo.cs
index fd70665a9..2654e928b 100644
--- a/csharp/src/Drivers/Apache/AssemblyInfo.cs
+++ b/csharp/src/Drivers/Apache/AssemblyInfo.cs
@@ -19,3 +19,4 @@ using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Drivers.Databricks,
PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Drivers.Databricks,
PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2,
PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")]
diff --git a/csharp/src/Drivers/Databricks/AssemblyInfo.cs
b/csharp/src/Drivers/Databricks/AssemblyInfo.cs
index 59e9e75de..abd9017b7 100644
--- a/csharp/src/Drivers/Databricks/AssemblyInfo.cs
+++ b/csharp/src/Drivers/Databricks/AssemblyInfo.cs
@@ -18,3 +18,4 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Drivers.Databricks,
PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2,
PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")]
diff --git
a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
index 99f38fe82..7eec4d768 100644
--- a/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
+++ b/csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs
@@ -37,6 +37,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
private const int DefaultMemoryBufferSizeMB = 200;
private const bool DefaultPrefetchEnabled = true;
private const int DefaultFetchBatchSize = 2000000;
+ private const int DefaultTimeoutMinutes = 5;
private readonly DatabricksStatement _statement;
private readonly Schema _schema;
@@ -50,6 +51,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
private bool _isDisposed;
private bool _isStarted;
private CancellationTokenSource? _cancellationTokenSource;
+ private DatabricksOperationStatusPoller? _operationStatusPoller;
/// <summary>
/// Initializes a new instance of the <see
cref="CloudFetchDownloadManager"/> class.
@@ -137,7 +139,7 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
}
// Parse timeout minutes
- int timeoutMinutes = 5;
+ int timeoutMinutes = DefaultTimeoutMinutes;
if
(connectionProps.TryGetValue(DatabricksParameters.CloudFetchTimeoutMinutes, out
string? timeoutStr))
{
if (int.TryParse(timeoutStr, out int parsedTimeout) &&
parsedTimeout > 0)
@@ -180,6 +182,9 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
_isLz4Compressed,
maxRetries,
retryDelayMs);
+
+ // Initialize the operation status poller
+ _operationStatusPoller = new
DatabricksOperationStatusPoller(_statement);
}
/// <summary>
@@ -247,6 +252,9 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
// Create a new cancellation token source
_cancellationTokenSource = new CancellationTokenSource();
+ // Start the operation status poller
+ _operationStatusPoller?.Start(_cancellationTokenSource.Token);
+
// Start the result fetcher
await
_resultFetcher.StartAsync(_cancellationTokenSource.Token).ConfigureAwait(false);
@@ -267,6 +275,9 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
// Cancel the token to signal all operations to stop
_cancellationTokenSource?.Cancel();
+ // Stop the operation status poller
+ DisposeOperationStatusPoller();
+
// Stop the downloader
await _downloader.StopAsync().ConfigureAwait(false);
@@ -291,6 +302,9 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
// Stop the pipeline
StopAsync().GetAwaiter().GetResult();
+ // Dispose the operation status poller
+ DisposeOperationStatusPoller();
+
// Dispose the HTTP client
_httpClient.Dispose();
@@ -326,5 +340,15 @@ namespace
Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch
throw new
ObjectDisposedException(nameof(CloudFetchDownloadManager));
}
}
+
+
+ private void DisposeOperationStatusPoller()
+ {
+ if (_operationStatusPoller != null)
+ {
+ _operationStatusPoller.Dispose();
+ _operationStatusPoller = null;
+ }
+ }
}
}
diff --git a/csharp/src/Drivers/Databricks/DatabricksOperationStatusPoller.cs
b/csharp/src/Drivers/Databricks/DatabricksOperationStatusPoller.cs
new file mode 100644
index 000000000..edb7cd058
--- /dev/null
+++ b/csharp/src/Drivers/Databricks/DatabricksOperationStatusPoller.cs
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch;
+using Apache.Hive.Service.Rpc.Thrift;
+
+namespace Apache.Arrow.Adbc.Drivers.Databricks
+{
+ /// <summary>
+ /// Service that periodically polls the operation status of a Databricks
warehouse query to keep it alive.
+ /// This is used to maintain the command results and session when reading
results takes a long time.
+ /// </summary>
+ internal class DatabricksOperationStatusPoller : IDisposable
+ {
+ private readonly IHiveServer2Statement _statement;
+ private readonly int _heartbeatIntervalSeconds;
+ // internal cancellation token source - won't affect the external token
+ private CancellationTokenSource? _internalCts;
+ private Task? _operationStatusPollingTask;
+
+ public DatabricksOperationStatusPoller(IHiveServer2Statement
statement, int heartbeatIntervalSeconds =
DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds)
+ {
+ _statement = statement ?? throw new
ArgumentNullException(nameof(statement));
+ _heartbeatIntervalSeconds = heartbeatIntervalSeconds;
+ }
+
+ public bool IsStarted => _operationStatusPollingTask != null;
+
+ /// <summary>
+ /// Starts the operation status poller. Continues polling every minute
until the operation is canceled/errored
+ /// the token is canceled, or the operation status poller is disposed.
+ /// </summary>
+ /// <param name="externalToken">The external cancellation
token.</param>
+ public void Start(CancellationToken externalToken = default)
+ {
+ if (IsStarted)
+ {
+ throw new InvalidOperationException("Operation status poller
already started");
+ }
+ _internalCts = new CancellationTokenSource();
+ // create a linked token to the external token so that the
external token can cancel the operation status polling task if needed
+ var linkedToken =
CancellationTokenSource.CreateLinkedTokenSource(_internalCts.Token,
externalToken).Token;
+ _operationStatusPollingTask = Task.Run(() =>
PollOperationStatus(linkedToken));
+ }
+
+ private async Task PollOperationStatus(CancellationToken
cancellationToken)
+ {
+ try
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var operationHandle = _statement.OperationHandle;
+ if (operationHandle == null) break;
+
+ var request = new TGetOperationStatusReq(operationHandle);
+ var response = await
_statement.Client.GetOperationStatus(request, cancellationToken);
+ await
Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken);
+
+ // end the heartbeat if the command has terminated
+ if (response.OperationState ==
TOperationState.CANCELED_STATE ||
+ response.OperationState == TOperationState.ERROR_STATE)
+ {
+ break;
+ }
+ }
+ }
+ catch (TaskCanceledException)
+ {
+ // ignore
+ }
+ }
+
+ public void Dispose()
+ {
+ if (_internalCts != null)
+ {
+ _internalCts.Cancel();
+ _operationStatusPollingTask?.Wait();
+ _internalCts.Dispose();
+ }
+ }
+ }
+}
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index 8c6122473..c5155680d 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -155,6 +155,11 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// </summary>
public class DatabricksConstants
{
+ /// <summary>
+ /// Default heartbeat interval in seconds for long-running operations
+ /// </summary>
+ public const int DefaultOperationStatusPollingIntervalSeconds = 60;
+
/// <summary>
/// OAuth grant type constants
/// </summary>
diff --git a/csharp/src/Drivers/Databricks/DatabricksReader.cs
b/csharp/src/Drivers/Databricks/DatabricksReader.cs
index cdd131111..d82f11f62 100644
--- a/csharp/src/Drivers/Databricks/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksReader.cs
@@ -33,6 +33,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
int index;
IArrowReader? reader;
bool isLz4Compressed;
+ private DatabricksOperationStatusPoller? _operationStatusPoller;
public DatabricksReader(DatabricksStatement statement, Schema schema,
bool isLz4Compressed)
{
@@ -48,14 +49,20 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
if (!statement.DirectResults.ResultSet.HasMoreRows)
{
this.statement = null;
+ return;
}
}
+ _operationStatusPoller = new
DatabricksOperationStatusPoller(statement);
}
public Schema Schema { get { return schema; } }
public async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
+ if (_operationStatusPoller != null &&
!_operationStatusPoller.IsStarted)
+ {
+ _operationStatusPoller.Start(cancellationToken);
+ }
while (true)
{
if (this.reader != null)
@@ -91,6 +98,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
if (!response.HasMoreRows)
{
this.statement = null;
+ DisposeOperationStatusPoller();
}
}
}
@@ -134,6 +142,16 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
public void Dispose()
{
+ DisposeOperationStatusPoller();
+ }
+
+ private void DisposeOperationStatusPoller()
+ {
+ if (_operationStatusPoller != null)
+ {
+ _operationStatusPoller.Dispose();
+ _operationStatusPoller = null;
+ }
}
}
}
diff --git a/csharp/test/Drivers/Apache/AssemblyInfo.cs
b/csharp/test/Drivers/Apache/AssemblyInfo.cs
index 7cf47f159..c2de91efc 100644
--- a/csharp/test/Drivers/Apache/AssemblyInfo.cs
+++ b/csharp/test/Drivers/Apache/AssemblyInfo.cs
@@ -33,3 +33,4 @@ using System.Runtime.InteropServices;
// The following GUID is for the ID of the typelib if this project is exposed
to COM.
[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Drivers.Databricks,
PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2,
PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")]
diff --git
a/csharp/test/Drivers/Databricks/DatabricksOperationStatusPollerTests.cs
b/csharp/test/Drivers/Databricks/DatabricksOperationStatusPollerTests.cs
new file mode 100644
index 000000000..9d91ed590
--- /dev/null
+++ b/csharp/test/Drivers/Databricks/DatabricksOperationStatusPollerTests.cs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Databricks.CloudFetch;
+using Apache.Arrow.Adbc.Drivers.Databricks;
+using Apache.Hive.Service.Rpc.Thrift;
+using Moq;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
+{
+ public class DatabricksOperationStatusPollerTests
+ {
+ private readonly ITestOutputHelper _outputHelper;
+ private readonly Mock<IHiveServer2Statement> _mockStatement;
+ private readonly Mock<TCLIService.IAsync> _mockClient;
+ private readonly TOperationHandle _operationHandle;
+
+ private readonly int _heartbeatIntervalSeconds = 1000;
+
+ public DatabricksOperationStatusPollerTests(ITestOutputHelper
outputHelper)
+ {
+ _outputHelper = outputHelper;
+ _mockClient = new Mock<TCLIService.IAsync>();
+ _mockStatement = new Mock<IHiveServer2Statement>();
+ _operationHandle = new TOperationHandle
+ {
+ OperationId = new THandleIdentifier { Guid = new byte[] { 1,
2, 3, 4 } },
+ OperationType = TOperationType.EXECUTE_STATEMENT
+ };
+
+ _mockStatement.Setup(s => s.Client).Returns(_mockClient.Object);
+ _mockStatement.Setup(s =>
s.OperationHandle).Returns(_operationHandle);
+ }
+
+ [Fact]
+ public async Task StartPollsOperationStatusAtInterval()
+ {
+ // Arrange
+ var poller = new
DatabricksOperationStatusPoller(_mockStatement.Object,
_heartbeatIntervalSeconds);
+ var pollCount = 0;
+ _mockClient.Setup(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()))
+ .ReturnsAsync(new TGetOperationStatusResp())
+ .Callback(() => pollCount++);
+
+ // Act
+ poller.Start();
+ await Task.Delay(_heartbeatIntervalSeconds * 2); // Wait for 2
seconds to allow multiple polls
+
+ // Assert
+ Assert.True(pollCount > 0, "Should have polled at least once");
+ _mockClient.Verify(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()), Times.AtLeastOnce);
+ }
+
+ [Fact]
+ public async Task DisposeStopsPolling()
+ {
+ // Arrange
+ var poller = new
DatabricksOperationStatusPoller(_mockStatement.Object,
_heartbeatIntervalSeconds);
+ var pollCount = 0;
+ _mockClient.Setup(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()))
+ .ReturnsAsync(new TGetOperationStatusResp())
+ .Callback(() => pollCount++);
+
+ // Act
+ poller.Start();
+ await Task.Delay(_heartbeatIntervalSeconds * 2); // Let it poll
for a bit
+ poller.Dispose();
+ await Task.Delay(_heartbeatIntervalSeconds * 2); // Wait to see if
it continues polling
+
+ // Assert
+ int finalPollCount = pollCount;
+ await Task.Delay(_heartbeatIntervalSeconds * 2); // Wait another
second
+ Assert.Equal(finalPollCount, pollCount); // Poll count should not
increase after disposal
+ }
+ }
+}
diff --git a/csharp/test/Drivers/Databricks/StatementTests.cs
b/csharp/test/Drivers/Databricks/StatementTests.cs
index 430d46164..c4e750955 100644
--- a/csharp/test/Drivers/Databricks/StatementTests.cs
+++ b/csharp/test/Drivers/Databricks/StatementTests.cs
@@ -433,5 +433,58 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
sqlUpdate = $"CREATE TABLE IF NOT EXISTS {fullTableNameParent}
(INDEX INT, NAME STRING, PRIMARY KEY (INDEX, NAME))";
primaryKeys = ["index", "name"];
}
+
+ // NOTE: this is a thirty minute test. As of writing, databricks
commands have 20 minutes of idle time (and checked every 5 mintues)
+ [SkippableTheory]
+ [InlineData(true, "CloudFetch enabled")]
+ [InlineData(false, "CloudFetch disabled")]
+ public async Task StatusPollerKeepsQueryAlive(bool useCloudFetch,
string configName)
+ {
+ OutputHelper?.WriteLine($"Testing status poller with long delay
between reads ({configName})");
+
+ // Create a connection using the test configuration with a small
batch size
+ var connectionParams = new Dictionary<string, string>
+ {
+ [DatabricksParameters.UseCloudFetch] =
useCloudFetch.ToString().ToLower()
+ };
+ using AdbcConnection connection = NewConnection(TestConfiguration,
connectionParams);
+ using var statement = connection.CreateStatement();
+
+ // Execute a query that should return data - using a larger
dataset to ensure multiple batches
+ statement.SqlQuery = "SELECT id, CAST(id AS STRING) as id_string,
id * 2 as id_doubled FROM RANGE(300000)";
+ QueryResult result = statement.ExecuteQuery();
+
+ Assert.NotNull(result.Stream);
+
+ // Read first batch
+ using var firstBatch = await
result.Stream.ReadNextRecordBatchAsync();
+ Assert.NotNull(firstBatch);
+ int firstBatchRows = firstBatch.Length;
+ OutputHelper?.WriteLine($"First batch: Read {firstBatchRows}
rows");
+
+ // Simulate a long delay (30 minutes)
+ OutputHelper?.WriteLine("Simulating 30 minute delay...");
+ await Task.Delay(TimeSpan.FromMinutes(30));
+
+ // Read remaining batches
+ int totalRows = firstBatchRows;
+ int batchCount = 1;
+
+ while (result.Stream != null)
+ {
+ using var batch = await
result.Stream.ReadNextRecordBatchAsync();
+ if (batch == null)
+ break;
+
+ batchCount++;
+ totalRows += batch.Length;
+ OutputHelper?.WriteLine($"Batch {batchCount}: Read
{batch.Length} rows");
+ }
+
+ // Verify we got all rows
+ Assert.Equal(300000, totalRows);
+ Assert.True(batchCount > 1, "Should have read multiple batches");
+ OutputHelper?.WriteLine($"Successfully read {totalRows} rows in
{batchCount} batches after 30 minute delay with {configName}");
+ }
}
}