This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 0a843925f feat(csharp/src/Drivers/Databricks): Added support for
connection param of maxBytesPerFetchRequest (#3474)
0a843925f is described below
commit 0a843925fcb98c13d6f4965c147231ad6193fee4
Author: msrathore-db <[email protected]>
AuthorDate: Mon Oct 6 07:35:52 2025 +0530
feat(csharp/src/Drivers/Databricks): Added support for connection param of
maxBytesPerFetchRequest (#3474)
## Description
Added support for the connection param of maxBytesPerFetchRequest. The
parameter is used for direct results as well.
[PECO-2734](https://databricks.atlassian.net/browse/PECO-2734)
---
.../src/Drivers/Databricks/DatabricksConnection.cs | 82 ++++++++++++++++++++++
.../src/Drivers/Databricks/DatabricksParameters.cs | 10 +++
.../src/Drivers/Databricks/DatabricksStatement.cs | 25 ++++++-
.../Reader/CloudFetch/CloudFetchResultFetcher.cs | 5 ++
.../Databricks/Reader/DatabricksCompositeReader.cs | 7 ++
.../Drivers/Databricks/Reader/DatabricksReader.cs | 7 ++
csharp/src/Drivers/Databricks/readme.md | 2 +-
.../Databricks/E2E/DatabricksConnectionTest.cs | 59 ++++++++++++++++
.../Databricks/E2E/DatabricksTestConfiguration.cs | 3 +
.../Databricks/E2E/DatabricksTestEnvironment.cs | 4 ++
10 files changed, 200 insertions(+), 4 deletions(-)
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 5a69bc5a8..e06a6eb95 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -70,6 +70,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
private bool _useCloudFetch = true;
private bool _canDecompressLz4 = true;
private long _maxBytesPerFile = DefaultMaxBytesPerFile;
+ private const long DefaultMaxBytesPerFetchRequest = 400 * 1024 * 1024;
// 400MB
+ private long _maxBytesPerFetchRequest = DefaultMaxBytesPerFetchRequest;
private const bool DefaultRetryOnUnavailable = true;
private const int DefaultTemporarilyUnavailableRetryTimeout = 900;
private bool _useDescTableExtended = true;
@@ -322,6 +324,26 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
_maxBytesPerFile = maxBytesPerFileValue;
}
+ if
(Properties.TryGetValue(DatabricksParameters.MaxBytesPerFetchRequest, out
string? maxBytesPerFetchRequestStr))
+ {
+ try
+ {
+ long maxBytesPerFetchRequestValue =
ParseBytesWithUnits(maxBytesPerFetchRequestStr);
+ if (maxBytesPerFetchRequestValue < 0)
+ {
+ throw new ArgumentOutOfRangeException(
+ nameof(Properties),
+ maxBytesPerFetchRequestValue,
+ $"Parameter
'{DatabricksParameters.MaxBytesPerFetchRequest}' value must be a non-negative
integer. Use 0 for no limit.");
+ }
+ _maxBytesPerFetchRequest = maxBytesPerFetchRequestValue;
+ }
+ catch (FormatException)
+ {
+ throw new ArgumentException($"Parameter
'{DatabricksParameters.MaxBytesPerFetchRequest}' value
'{maxBytesPerFetchRequestStr}' could not be parsed. Valid formats: number with
optional unit suffix (B, KB, MB, GB). Examples: '400MB', '1024KB',
'1073741824'.");
+ }
+ }
+
// Parse default namespace
string? defaultCatalog = null;
string? defaultSchema = null;
@@ -477,6 +499,11 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// </summary>
internal long MaxBytesPerFile => _maxBytesPerFile;
+ /// <summary>
+ /// Gets the maximum bytes per fetch request.
+ /// </summary>
+ internal long MaxBytesPerFetchRequest => _maxBytesPerFetchRequest;
+
/// <summary>
/// Gets the default namespace to use for SQL queries.
/// </summary>
@@ -771,6 +798,61 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
return "`" + value.Replace("`", "``") + "`";
}
+ /// <summary>
+ /// Parses a byte value that may include unit suffixes (B, KB, MB, GB).
+ /// </summary>
+ /// <param name="value">The value to parse, e.g., "400MB", "1024KB",
"1073741824"</param>
+ /// <returns>The value in bytes</returns>
+ /// <exception cref="FormatException">Thrown when the value cannot be
parsed</exception>
+ internal static long ParseBytesWithUnits(string value)
+ {
+ if (string.IsNullOrWhiteSpace(value))
+ {
+ throw new FormatException("Value cannot be null or empty");
+ }
+
+ value = value.Trim().ToUpperInvariant();
+
+ // Check for unit suffixes
+ long multiplier = 1;
+ string numberPart = value;
+
+ if (value.EndsWith("GB"))
+ {
+ multiplier = 1024L * 1024L * 1024L;
+ numberPart = value.Substring(0, value.Length - 2);
+ }
+ else if (value.EndsWith("MB"))
+ {
+ multiplier = 1024L * 1024L;
+ numberPart = value.Substring(0, value.Length - 2);
+ }
+ else if (value.EndsWith("KB"))
+ {
+ multiplier = 1024L;
+ numberPart = value.Substring(0, value.Length - 2);
+ }
+ else if (value.EndsWith("B"))
+ {
+ multiplier = 1L;
+ numberPart = value.Substring(0, value.Length - 1);
+ }
+
+ if (!long.TryParse(numberPart.Trim(), out long number))
+ {
+ throw new FormatException($"Invalid number format:
{numberPart}");
+ }
+
+ try
+ {
+ return checked(number * multiplier);
+ }
+ catch (OverflowException)
+ {
+ throw new FormatException($"Value {value} results in overflow
when converted to bytes");
+ }
+ }
+
protected override void ValidateOptions()
{
base.ValidateOptions();
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index f79b0002a..9c01d8a7c 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -40,6 +40,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// <summary>
/// Maximum bytes per file for CloudFetch.
+ /// The value can be specified with unit suffixes: B (bytes), KB
(kilobytes), MB (megabytes), GB (gigabytes).
+ /// If no unit is specified, the value is treated as bytes.
/// Default value is 20MB if not specified.
/// </summary>
public const string MaxBytesPerFile =
"adbc.databricks.cloudfetch.max_bytes_per_file";
@@ -131,6 +133,14 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// </summary>
public const string CloudFetchPrefetchEnabled =
"adbc.databricks.cloudfetch.prefetch_enabled";
+ /// <summary>
+ /// Maximum bytes per fetch request when retrieving query results from
servers.
+ /// The value can be specified with unit suffixes: B (bytes), KB
(kilobytes), MB (megabytes), GB (gigabytes).
+ /// If no unit is specified, the value is treated as bytes.
+ /// Default value is 400MB if not specified.
+ /// </summary>
+ public const string MaxBytesPerFetchRequest =
"adbc.databricks.max_bytes_per_fetch_request";
+
/// <summary>
/// The OAuth grant type to use for authentication.
/// Supported values:
diff --git a/csharp/src/Drivers/Databricks/DatabricksStatement.cs
b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
index 322b92bef..3610da6fa 100644
--- a/csharp/src/Drivers/Databricks/DatabricksStatement.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
@@ -39,6 +39,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
private bool useCloudFetch;
private bool canDecompressLz4;
private long maxBytesPerFile;
+ private long maxBytesPerFetchRequest;
private bool enableMultipleCatalogSupport;
private bool enablePKFK;
private bool runAsyncInThrift;
@@ -61,6 +62,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
useCloudFetch = connection.UseCloudFetch;
canDecompressLz4 = connection.CanDecompressLz4;
maxBytesPerFile = connection.MaxBytesPerFile;
+ maxBytesPerFetchRequest = connection.MaxBytesPerFetchRequest;
enableMultipleCatalogSupport =
connection.EnableMultipleCatalogSupport;
enablePKFK = connection.EnablePKFK;
@@ -155,13 +157,25 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
}
break;
case DatabricksParameters.MaxBytesPerFile:
- if (long.TryParse(value, out long maxBytesPerFileValue))
+ try
{
+ long maxBytesPerFileValue =
DatabricksConnection.ParseBytesWithUnits(value);
this.maxBytesPerFile = maxBytesPerFileValue;
}
- else
+ catch (FormatException)
{
- throw new ArgumentException($"Invalid value for {key}:
{value}. Expected a long value.");
+ throw new ArgumentException($"Invalid value for {key}:
{value}. Valid formats: number with optional unit suffix (B, KB, MB, GB).
Examples: '20MB', '1024KB', '1073741824'.");
+ }
+ break;
+ case DatabricksParameters.MaxBytesPerFetchRequest:
+ try
+ {
+ long maxBytesPerFetchRequestValue =
DatabricksConnection.ParseBytesWithUnits(value);
+ this.maxBytesPerFetchRequest =
maxBytesPerFetchRequestValue;
+ }
+ catch (FormatException)
+ {
+ throw new ArgumentException($"Invalid value for {key}:
{value}. Valid formats: number with optional unit suffix (B, KB, MB, GB).
Examples: '400MB', '1024KB', '1073741824'.");
}
break;
default:
@@ -194,6 +208,11 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
/// </summary>
public bool CanDecompressLz4 => canDecompressLz4;
+ /// <summary>
+ /// Gets the maximum bytes per fetch request.
+ /// </summary>
+ public long MaxBytesPerFetchRequest => maxBytesPerFetchRequest;
+
/// <summary>
/// Sets whether the client can decompress LZ4 compressed results.
/// </summary>
diff --git
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
index acbd2589b..3bf88ca53 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
@@ -280,6 +280,11 @@ namespace
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
// Create fetch request
TFetchResultsReq request = new
TFetchResultsReq(_response.OperationHandle!, TFetchOrientation.FETCH_NEXT,
_batchSize);
+ if (_statement is DatabricksStatement databricksStatement)
+ {
+ request.MaxBytes = databricksStatement.MaxBytesPerFetchRequest;
+ }
+
// Set the start row offset
long startOffset = offset ?? _startOffset;
if (startOffset > 0)
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
b/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
index eb2c07294..b55d5b8bb 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
@@ -123,6 +123,13 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
// Make a FetchResults call to get the initial result set
// and determine the reader based on the result set
TFetchResultsReq request = new
TFetchResultsReq(_response.OperationHandle!, TFetchOrientation.FETCH_NEXT,
this._statement.BatchSize);
+
+ // Set MaxBytes from DatabricksStatement
+ if (this._statement is DatabricksStatement databricksStatement)
+ {
+ request.MaxBytes =
databricksStatement.MaxBytesPerFetchRequest;
+ }
+
TFetchResultsResp response = await
this._statement.Client!.FetchResults(request, cancellationToken);
_activeReader = DetermineReader(response);
}
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
index b1aea2bda..a0ff9797c 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
@@ -83,6 +83,13 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
}
// TODO: use an expiring cancellationtoken
TFetchResultsReq request = new
TFetchResultsReq(this.response.OperationHandle!, TFetchOrientation.FETCH_NEXT,
this.statement.BatchSize);
+
+ // Set MaxBytes from DatabricksStatement
+ if (this.statement is DatabricksStatement
databricksStatement)
+ {
+ request.MaxBytes =
databricksStatement.MaxBytesPerFetchRequest;
+ }
+
TFetchResultsResp response = await
this.statement.Connection.Client!.FetchResults(request, cancellationToken);
// Make sure we get the arrowBatches
diff --git a/csharp/src/Drivers/Databricks/readme.md
b/csharp/src/Drivers/Databricks/readme.md
index cb8536228..9159a7521 100644
--- a/csharp/src/Drivers/Databricks/readme.md
+++ b/csharp/src/Drivers/Databricks/readme.md
@@ -103,7 +103,7 @@ CloudFetch is Databricks' high-performance result retrieval
system that download
| :--- | :--- | :--- |
| `adbc.databricks.cloudfetch.enabled` | Whether to use CloudFetch for
retrieving results | `true` |
| `adbc.databricks.cloudfetch.lz4.enabled` | Whether the client can decompress
LZ4 compressed results | `true` |
-| `adbc.databricks.cloudfetch.max_bytes_per_file` | Maximum bytes per file for
CloudFetch (e.g., `20971520` for 20MB) | `20971520` |
+| `adbc.databricks.cloudfetch.max_bytes_per_file` | Maximum bytes per file for
CloudFetch. Supports unit suffixes (B, KB, MB, GB). Examples: `20MB`, `1024KB`,
`20971520` | `20MB` |
| `adbc.databricks.cloudfetch.parallel_downloads` | Maximum number of parallel
downloads | `3` |
| `adbc.databricks.cloudfetch.prefetch_count` | Number of files to prefetch |
`2` |
| `adbc.databricks.cloudfetch.memory_buffer_size_mb` | Maximum memory buffer
size in MB for prefetched files | `200` |
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
b/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
index 0df7c22c7..9488e0d9f 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
@@ -56,6 +56,62 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
OutputHelper?.WriteLine(exception.Message);
}
+ /// <summary>
+ /// Validates that MaxBytesPerFetchRequest parameter accepts valid
values with unit suffixes.
+ /// </summary>
+ [SkippableTheory]
+ [InlineData("300MB", 300L * 1024L * 1024L)]
+ [InlineData("1GB", 1024L * 1024L * 1024L)]
+ [InlineData("512KB", 512L * 1024L)]
+ [InlineData("1024B", 1024L)]
+ [InlineData("1024", 1024L)]
+ [InlineData("0", 0L)]
+ internal void CanParseMaxBytesPerFetchRequestParameter(string
parameterValue, long expectedBytes)
+ {
+ var testConfig =
(DatabricksTestConfiguration)TestConfiguration.Clone();
+ var parameters = TestEnvironment.GetDriverParameters(testConfig);
+ parameters[DatabricksParameters.MaxBytesPerFetchRequest] =
parameterValue;
+
+ AdbcDriver driver = NewDriver;
+ AdbcDatabase database = driver.Open(parameters);
+
+ // This should not throw an exception
+ using var connection = database.Connect(parameters);
+
+ // Verify the parameter was parsed correctly by creating a
statement and checking the property
+ using var statement = connection.CreateStatement();
+ if (statement is DatabricksStatement databricksStatement)
+ {
+ Assert.Equal(expectedBytes,
databricksStatement.MaxBytesPerFetchRequest);
+ }
+ }
+
+ /// <summary>
+ /// Validates that MaxBytesPerFetchRequest parameter can be set via
test configuration.
+ /// </summary>
+ [SkippableTheory]
+ [InlineData("500MB", 500L * 1024L * 1024L)]
+ [InlineData("2GB", 2L * 1024L * 1024L * 1024L)]
+ internal void CanSetMaxBytesPerFetchRequestViaTestConfiguration(string
configValue, long expectedBytes)
+ {
+ var testConfig =
(DatabricksTestConfiguration)TestConfiguration.Clone();
+ testConfig.MaxBytesPerFetchRequest = configValue;
+ var parameters = TestEnvironment.GetDriverParameters(testConfig);
+
+ AdbcDriver driver = NewDriver;
+ AdbcDatabase database = driver.Open(parameters);
+
+ // This should not throw an exception
+ using var connection = database.Connect(parameters);
+
+ // Verify the parameter was set correctly via test configuration
+ using var statement = connection.CreateStatement();
+ if (statement is DatabricksStatement databricksStatement)
+ {
+ Assert.Equal(expectedBytes,
databricksStatement.MaxBytesPerFetchRequest);
+ }
+ }
+
/// <summary>
/// Tests connection timeout to establish a session with the backend.
/// </summary>
@@ -304,6 +360,9 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
Add(new(new() { [SparkParameters.HostName] =
"valid.server.com", [SparkParameters.Token] = "abcdef",
[DatabricksParameters.CanDecompressLz4] = "notabool"},
typeof(ArgumentException)));
Add(new(new() { [SparkParameters.HostName] =
"valid.server.com", [SparkParameters.Token] = "abcdef",
[DatabricksParameters.MaxBytesPerFile] = "notanumber" },
typeof(ArgumentException)));
Add(new(new() { [SparkParameters.HostName] =
"valid.server.com", [SparkParameters.Token] = "abcdef",
[DatabricksParameters.MaxBytesPerFile] = "-100" },
typeof(ArgumentOutOfRangeException)));
+ Add(new(new() { [SparkParameters.HostName] =
"valid.server.com", [SparkParameters.Token] = "abcdef",
[DatabricksParameters.MaxBytesPerFetchRequest] = "notanumber" },
typeof(ArgumentException)));
+ Add(new(new() { [SparkParameters.HostName] =
"valid.server.com", [SparkParameters.Token] = "abcdef",
[DatabricksParameters.MaxBytesPerFetchRequest] = "-100" },
typeof(ArgumentOutOfRangeException)));
+ Add(new(new() { [SparkParameters.HostName] =
"valid.server.com", [SparkParameters.Token] = "abcdef",
[DatabricksParameters.MaxBytesPerFetchRequest] = "invalid_unit" },
typeof(ArgumentException)));
Add(new(new() { [SparkParameters.HostName] =
"valid.server.com", [SparkParameters.Token] = "abcdef",
[DatabricksParameters.EnableDirectResults] = "notabool" },
typeof(ArgumentException)));
Add(new(new() { /*[SparkParameters.Type] =
SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] =
"valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port]
= "-1" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { /*[SparkParameters.Type] =
SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] =
"valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port]
= IPEndPoint.MinPort.ToString(CultureInfo.InvariantCulture) },
typeof(ArgumentOutOfRangeException)));
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
b/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
index baf323484..e3b6efbb5 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
@@ -63,5 +63,8 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
[JsonPropertyName("enableDirectResults"), JsonIgnore(Condition =
JsonIgnoreCondition.WhenWritingDefault)]
public string EnableDirectResults { get; set; } = string.Empty;
+
+ [JsonPropertyName("maxBytesPerFetchRequest"), JsonIgnore(Condition =
JsonIgnoreCondition.WhenWritingDefault)]
+ public string MaxBytesPerFetchRequest { get; set; } = string.Empty;
}
}
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
b/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
index a553856c5..3b2b53efb 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
@@ -173,6 +173,10 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
{
parameters.Add(DatabricksParameters.EnableDirectResults,
testConfiguration.EnableDirectResults!);
}
+ if
(!string.IsNullOrEmpty(testConfiguration.MaxBytesPerFetchRequest))
+ {
+ parameters.Add(DatabricksParameters.MaxBytesPerFetchRequest,
testConfiguration.MaxBytesPerFetchRequest!);
+ }
if (testConfiguration.HttpOptions != null)
{
if (testConfiguration.HttpOptions.Tls != null)