CurtHagenlocher commented on code in PR #3217:
URL: https://github.com/apache/arrow-adbc/pull/3217#discussion_r2263760599
##########
csharp/src/Drivers/Databricks/DatabricksCompositeReader.cs:
##########
@@ -66,10 +63,15 @@ internal DatabricksCompositeReader(DatabricksStatement
statement, Schema schema,
_proxyConfigurator = proxyConfigurator;
// use direct results if available
- if (_statement.HasDirectResults && _statement.DirectResults !=
null && _statement.DirectResults.__isset.resultSet)
+ if (_statement.HasDirectResults && _statement.DirectResults !=
null && _statement.DirectResults.__isset.resultSet &&
statement.DirectResults?.ResultSet != null)
{
_activeReader =
DetermineReader(_statement.DirectResults.ResultSet);
}
+ if (_statement.DirectResults?.ResultSet.HasMoreRows ?? true)
+ {
+ operationStatusPoller = new
DatabricksOperationStatusPoller(statement);
+ operationStatusPoller.Start();
Review Comment:
There are some trailing spaces on this line that the linter doesn't like.
##########
csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs:
##########
@@ -82,14 +84,118 @@ public async Task DisposeStopsPolling()
// Act
poller.Start();
- await Task.Delay(_heartbeatIntervalSeconds * 2); // Let it poll
for a bit
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Let it poll for a bit
poller.Dispose();
- await Task.Delay(_heartbeatIntervalSeconds * 2); // Wait to see if
it continues polling
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Wait to see if it continues polling
// Assert
int finalPollCount = pollCount;
- await Task.Delay(_heartbeatIntervalSeconds * 2); // Wait another
second
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Wait another second
Assert.Equal(finalPollCount, pollCount); // Poll count should not
increase after disposal
}
+
+ [Fact]
+ public async Task StopStopsPolling()
+ {
+ // Arrange
+ var poller = new
DatabricksOperationStatusPoller(_mockStatement.Object,
_heartbeatIntervalSeconds);
+ var pollCount = 0;
+ _mockClient.Setup(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()))
+ .ReturnsAsync(new TGetOperationStatusResp())
+ .Callback(() => pollCount++);
+
+ // Act
+ poller.Start();
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Let it poll for a bit
+ poller.Stop();
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Wait to see if it continues polling
+
+ // Assert
+ int finalPollCount = pollCount;
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Wait another second
+ Assert.Equal(finalPollCount, pollCount); // Poll count should not
increase after stopping
+
+ // Clean up
+ poller.Dispose();
+ }
+
+ [Fact]
+ public async Task StopsPollingOnAllTerminalOperationStates()
+ {
+ var terminalStates = new[]
+ {
+ TOperationState.CANCELED_STATE,
+ TOperationState.ERROR_STATE,
+ TOperationState.CLOSED_STATE,
+ TOperationState.TIMEDOUT_STATE,
+ TOperationState.UKNOWN_STATE
+ };
+
+ foreach (var terminalState in terminalStates)
+ {
+ // Arrange
+ var poller = new
DatabricksOperationStatusPoller(_mockStatement.Object,
_heartbeatIntervalSeconds);
+ var pollCount = 0;
+ _mockClient.Setup(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()))
+ .ReturnsAsync(new TGetOperationStatusResp { OperationState
= terminalState })
+ .Callback(() => pollCount++);
+
+ // Act
+ poller.Start();
+ await
Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds * 3)); // Wait longer
than heartbeat interval
+
+ // Assert
+ Assert.Equal(1, pollCount);
+
+ poller.Dispose();
+ }
+ }
+
+ [Fact]
+ public async Task ContinuesPollingOnFinishedState()
+ {
+ // Arrange
+ var poller = new
DatabricksOperationStatusPoller(_mockStatement.Object,
_heartbeatIntervalSeconds);
+ var pollCount = 0;
+ _mockClient.Setup(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()))
+ .ReturnsAsync(new TGetOperationStatusResp { OperationState =
TOperationState.FINISHED_STATE })
+ .Callback(() => pollCount++);
+
+ // Act
+ poller.Start();
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Wait longer than heartbeat interval
+
+ // Assert
+ // Should continue polling when in running state
+ Assert.True(pollCount > 1, $"Expected multiple polls but got
{pollCount}");
+ poller.Dispose();
+ }
+
+ [Fact]
+ public async Task StopsPollingOnException()
+ {
+ // Arrange
+ var poller = new
DatabricksOperationStatusPoller(_mockStatement.Object,
_heartbeatIntervalSeconds);
+ var pollCount = 0;
+ _mockClient.Setup(c =>
c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(),
It.IsAny<CancellationToken>()))
+ .ThrowsAsync(new Exception("Test exception"))
+ .Callback(() => pollCount++);
+
+ // Act
+ poller.Start();
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Wait longer than heartbeat interval
+
+ // Assert
+ // Should stop polling after the exception
+ Assert.Equal(1, pollCount);
+ try
+ {
+ poller.Dispose();
+ }
+ catch (Exception)
+ {
+
Review Comment:
nit: remove blank line
##########
csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs:
##########
@@ -22,8 +22,10 @@
using Apache.Arrow.Adbc.Drivers.Databricks;
using Apache.Hive.Service.Rpc.Thrift;
using Moq;
+using System.Collections.Generic;
using Xunit;
using Xunit.Abstractions;
+using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
Review Comment:
nit: sort
##########
csharp/src/Drivers/Databricks/DatabricksOperationStatusPoller.cs:
##########
@@ -31,14 +32,16 @@ internal class DatabricksOperationStatusPoller : IDisposable
{
private readonly IHiveServer2Statement _statement;
private readonly int _heartbeatIntervalSeconds;
+ private readonly int _requestTimeoutSeconds;
// internal cancellation token source - won't affect the external token
private CancellationTokenSource? _internalCts;
private Task? _operationStatusPollingTask;
- public DatabricksOperationStatusPoller(IHiveServer2Statement
statement, int heartbeatIntervalSeconds =
DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds)
+ public DatabricksOperationStatusPoller(IHiveServer2Statement
statement, int heartbeatIntervalSeconds =
DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds, int
requestTimeoutSeconds =
DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds)
Review Comment:
```suggestion
public DatabricksOperationStatusPoller(
IHiveServer2Statement statement,
int heartbeatIntervalSeconds =
DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds,
int requestTimeoutSeconds =
DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds)
```
i.e. split across multiple lines
##########
csharp/test/Drivers/Databricks/Unit/DatabricksOperationStatusPollerTests.cs:
##########
@@ -82,14 +84,118 @@ public async Task DisposeStopsPolling()
// Act
poller.Start();
- await Task.Delay(_heartbeatIntervalSeconds * 2); // Let it poll
for a bit
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Let it poll for a bit
poller.Dispose();
- await Task.Delay(_heartbeatIntervalSeconds * 2); // Wait to see if
it continues polling
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Wait to see if it continues polling
// Assert
int finalPollCount = pollCount;
- await Task.Delay(_heartbeatIntervalSeconds * 2); // Wait another
second
+ await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds *
3)); // Wait another second
Assert.Equal(finalPollCount, pollCount); // Poll count should not
increase after disposal
}
+
+ [Fact]
+ public async Task StopStopsPolling()
+ {
+ // Arrange
+ var poller = new
DatabricksOperationStatusPoller(_mockStatement.Object,
_heartbeatIntervalSeconds);
Review Comment:
Consider a `using` block for `poller` instead of an explicit `Dispose`. This
will ensure that the `Dispose` happens even if an exception is thrown inside
the `using` block. Applies to many of the tests in this file, and probably not
super-important unless the failure to `Dispose` in one test could cause another
test to fail.
##########
csharp/src/Drivers/Databricks/DatabricksCompositeReader.cs:
##########
@@ -108,5 +110,42 @@ private BaseDatabricksReader
DetermineReader(TFetchResultsResp initialResults)
return await
_activeReader.ReadNextRecordBatchAsync(cancellationToken);
}
+
+ public override async ValueTask<RecordBatch?>
ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ {
+ var result = await
ReadNextRecordBatchInternalAsync(cancellationToken);
+ // Stop the poller when we've reached the end of results
+ if (result == null)
+ {
+ StopOperationStatusPoller();
+ }
+ return result;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _activeReader?.Dispose();
+ DisposeOperationStatusPoller();
+ }
+ _activeReader = null;
+ base.Dispose(disposing);
+ }
+
+ private void DisposeOperationStatusPoller()
+ {
+ if (operationStatusPoller != null)
+ {
+ StopOperationStatusPoller();
+ operationStatusPoller.Dispose();
+ operationStatusPoller = null;
+ }
+ }
+
+ private void StopOperationStatusPoller()
+ {
+ operationStatusPoller?.Stop();
Review Comment:
Consider setting to `null` here instead of the
`DisposeOperationStatusPoller` method to avoid duplicate calls.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]