This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a843925f feat(csharp/src/Drivers/Databricks): Added support for 
connection param of maxBytesPerFetchRequest (#3474)
0a843925f is described below

commit 0a843925fcb98c13d6f4965c147231ad6193fee4
Author: msrathore-db <[email protected]>
AuthorDate: Mon Oct 6 07:35:52 2025 +0530

    feat(csharp/src/Drivers/Databricks): Added support for connection param of 
maxBytesPerFetchRequest (#3474)
    
    ## Description
    Added support for the connection param of maxBytesPerFetchRequest. The
    parameter is used for direct results as well.
    [PECO-2734](https://databricks.atlassian.net/browse/PECO-2734)
---
 .../src/Drivers/Databricks/DatabricksConnection.cs | 82 ++++++++++++++++++++++
 .../src/Drivers/Databricks/DatabricksParameters.cs | 10 +++
 .../src/Drivers/Databricks/DatabricksStatement.cs  | 25 ++++++-
 .../Reader/CloudFetch/CloudFetchResultFetcher.cs   |  5 ++
 .../Databricks/Reader/DatabricksCompositeReader.cs |  7 ++
 .../Drivers/Databricks/Reader/DatabricksReader.cs  |  7 ++
 csharp/src/Drivers/Databricks/readme.md            |  2 +-
 .../Databricks/E2E/DatabricksConnectionTest.cs     | 59 ++++++++++++++++
 .../Databricks/E2E/DatabricksTestConfiguration.cs  |  3 +
 .../Databricks/E2E/DatabricksTestEnvironment.cs    |  4 ++
 10 files changed, 200 insertions(+), 4 deletions(-)

diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs 
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 5a69bc5a8..e06a6eb95 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -70,6 +70,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         private bool _useCloudFetch = true;
         private bool _canDecompressLz4 = true;
         private long _maxBytesPerFile = DefaultMaxBytesPerFile;
+        private const long DefaultMaxBytesPerFetchRequest = 400 * 1024 * 1024; 
// 400MB
+        private long _maxBytesPerFetchRequest = DefaultMaxBytesPerFetchRequest;
         private const bool DefaultRetryOnUnavailable = true;
         private const int DefaultTemporarilyUnavailableRetryTimeout = 900;
         private bool _useDescTableExtended = true;
@@ -322,6 +324,26 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                 _maxBytesPerFile = maxBytesPerFileValue;
             }
 
+            if 
(Properties.TryGetValue(DatabricksParameters.MaxBytesPerFetchRequest, out 
string? maxBytesPerFetchRequestStr))
+            {
+                try
+                {
+                    long maxBytesPerFetchRequestValue = 
ParseBytesWithUnits(maxBytesPerFetchRequestStr);
+                    if (maxBytesPerFetchRequestValue < 0)
+                    {
+                        throw new ArgumentOutOfRangeException(
+                            nameof(Properties),
+                            maxBytesPerFetchRequestValue,
+                            $"Parameter 
'{DatabricksParameters.MaxBytesPerFetchRequest}' value must be a non-negative 
integer. Use 0 for no limit.");
+                    }
+                    _maxBytesPerFetchRequest = maxBytesPerFetchRequestValue;
+                }
+                catch (FormatException)
+                {
+                    throw new ArgumentException($"Parameter 
'{DatabricksParameters.MaxBytesPerFetchRequest}' value 
'{maxBytesPerFetchRequestStr}' could not be parsed. Valid formats: number with 
optional unit suffix (B, KB, MB, GB). Examples: '400MB', '1024KB', 
'1073741824'.");
+                }
+            }
+
             // Parse default namespace
             string? defaultCatalog = null;
             string? defaultSchema = null;
@@ -477,6 +499,11 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// </summary>
         internal long MaxBytesPerFile => _maxBytesPerFile;
 
+        /// <summary>
+        /// Gets the maximum bytes per fetch request.
+        /// </summary>
+        internal long MaxBytesPerFetchRequest => _maxBytesPerFetchRequest;
+
         /// <summary>
         /// Gets the default namespace to use for SQL queries.
         /// </summary>
@@ -771,6 +798,61 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
             return "`" + value.Replace("`", "``") + "`";
         }
 
+        /// <summary>
+        /// Parses a byte value that may include unit suffixes (B, KB, MB, GB).
+        /// </summary>
+        /// <param name="value">The value to parse, e.g., "400MB", "1024KB", 
"1073741824"</param>
+        /// <returns>The value in bytes</returns>
+        /// <exception cref="FormatException">Thrown when the value cannot be 
parsed</exception>
+        internal static long ParseBytesWithUnits(string value)
+        {
+            if (string.IsNullOrWhiteSpace(value))
+            {
+                throw new FormatException("Value cannot be null or empty");
+            }
+
+            value = value.Trim().ToUpperInvariant();
+
+            // Check for unit suffixes
+            long multiplier = 1;
+            string numberPart = value;
+
+            if (value.EndsWith("GB"))
+            {
+                multiplier = 1024L * 1024L * 1024L;
+                numberPart = value.Substring(0, value.Length - 2);
+            }
+            else if (value.EndsWith("MB"))
+            {
+                multiplier = 1024L * 1024L;
+                numberPart = value.Substring(0, value.Length - 2);
+            }
+            else if (value.EndsWith("KB"))
+            {
+                multiplier = 1024L;
+                numberPart = value.Substring(0, value.Length - 2);
+            }
+            else if (value.EndsWith("B"))
+            {
+                multiplier = 1L;
+                numberPart = value.Substring(0, value.Length - 1);
+            }
+
+            if (!long.TryParse(numberPart.Trim(), out long number))
+            {
+                throw new FormatException($"Invalid number format: 
{numberPart}");
+            }
+
+            try
+            {
+                return checked(number * multiplier);
+            }
+            catch (OverflowException)
+            {
+                throw new FormatException($"Value {value} results in overflow 
when converted to bytes");
+            }
+        }
+
         protected override void ValidateOptions()
         {
             base.ValidateOptions();
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs 
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index f79b0002a..9c01d8a7c 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -40,6 +40,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
 
         /// <summary>
         /// Maximum bytes per file for CloudFetch.
+        /// The value can be specified with unit suffixes: B (bytes), KB 
(kilobytes), MB (megabytes), GB (gigabytes).
+        /// If no unit is specified, the value is treated as bytes.
         /// Default value is 20MB if not specified.
         /// </summary>
         public const string MaxBytesPerFile = 
"adbc.databricks.cloudfetch.max_bytes_per_file";
@@ -131,6 +133,14 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// </summary>
         public const string CloudFetchPrefetchEnabled = 
"adbc.databricks.cloudfetch.prefetch_enabled";
 
+        /// <summary>
+        /// Maximum bytes per fetch request when retrieving query results from 
servers.
+        /// The value can be specified with unit suffixes: B (bytes), KB 
(kilobytes), MB (megabytes), GB (gigabytes).
+        /// If no unit is specified, the value is treated as bytes.
+        /// Default value is 400MB if not specified.
+        /// </summary>
+        public const string MaxBytesPerFetchRequest = 
"adbc.databricks.max_bytes_per_fetch_request";
+
         /// <summary>
         /// The OAuth grant type to use for authentication.
         /// Supported values:
diff --git a/csharp/src/Drivers/Databricks/DatabricksStatement.cs 
b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
index 322b92bef..3610da6fa 100644
--- a/csharp/src/Drivers/Databricks/DatabricksStatement.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
@@ -39,6 +39,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         private bool useCloudFetch;
         private bool canDecompressLz4;
         private long maxBytesPerFile;
+        private long maxBytesPerFetchRequest;
         private bool enableMultipleCatalogSupport;
         private bool enablePKFK;
         private bool runAsyncInThrift;
@@ -61,6 +62,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
             useCloudFetch = connection.UseCloudFetch;
             canDecompressLz4 = connection.CanDecompressLz4;
             maxBytesPerFile = connection.MaxBytesPerFile;
+            maxBytesPerFetchRequest = connection.MaxBytesPerFetchRequest;
             enableMultipleCatalogSupport = 
connection.EnableMultipleCatalogSupport;
             enablePKFK = connection.EnablePKFK;
 
@@ -155,13 +157,25 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                     }
                     break;
                 case DatabricksParameters.MaxBytesPerFile:
-                    if (long.TryParse(value, out long maxBytesPerFileValue))
+                    try
                     {
+                        long maxBytesPerFileValue = 
DatabricksConnection.ParseBytesWithUnits(value);
                         this.maxBytesPerFile = maxBytesPerFileValue;
                     }
-                    else
+                    catch (FormatException)
                     {
-                        throw new ArgumentException($"Invalid value for {key}: 
{value}. Expected a long value.");
+                        throw new ArgumentException($"Invalid value for {key}: 
{value}. Valid formats: number with optional unit suffix (B, KB, MB, GB). 
Examples: '20MB', '1024KB', '1073741824'.");
+                    }
+                    break;
+                case DatabricksParameters.MaxBytesPerFetchRequest:
+                    try
+                    {
+                        long maxBytesPerFetchRequestValue = 
DatabricksConnection.ParseBytesWithUnits(value);
+                        this.maxBytesPerFetchRequest = 
maxBytesPerFetchRequestValue;
+                    }
+                    catch (FormatException)
+                    {
+                        throw new ArgumentException($"Invalid value for {key}: 
{value}. Valid formats: number with optional unit suffix (B, KB, MB, GB). 
Examples: '400MB', '1024KB', '1073741824'.");
                     }
                     break;
                 default:
@@ -194,6 +208,11 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// </summary>
         public bool CanDecompressLz4 => canDecompressLz4;
 
+        /// <summary>
+        /// Gets the maximum bytes per fetch request.
+        /// </summary>
+        public long MaxBytesPerFetchRequest => maxBytesPerFetchRequest;
+
         /// <summary>
         /// Sets whether the client can decompress LZ4 compressed results.
         /// </summary>
diff --git 
a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs 
b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
index acbd2589b..3bf88ca53 100644
--- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
+++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs
@@ -280,6 +280,11 @@ namespace 
Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
             // Create fetch request
             TFetchResultsReq request = new 
TFetchResultsReq(_response.OperationHandle!, TFetchOrientation.FETCH_NEXT, 
_batchSize);
 
+            if (_statement is DatabricksStatement databricksStatement)
+            {
+                request.MaxBytes = databricksStatement.MaxBytesPerFetchRequest;
+            }
+
             // Set the start row offset
             long startOffset = offset ?? _startOffset;
             if (startOffset > 0)
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs 
b/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
index eb2c07294..b55d5b8bb 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksCompositeReader.cs
@@ -123,6 +123,13 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
                 // Make a FetchResults call to get the initial result set
                 // and determine the reader based on the result set
                 TFetchResultsReq request = new 
TFetchResultsReq(_response.OperationHandle!, TFetchOrientation.FETCH_NEXT, 
this._statement.BatchSize);
+
+                // Set MaxBytes from DatabricksStatement
+                if (this._statement is DatabricksStatement databricksStatement)
+                {
+                    request.MaxBytes = 
databricksStatement.MaxBytesPerFetchRequest;
+                }
+
                 TFetchResultsResp response = await 
this._statement.Client!.FetchResults(request, cancellationToken);
                 _activeReader = DetermineReader(response);
             }
diff --git a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs 
b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
index b1aea2bda..a0ff9797c 100644
--- a/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
+++ b/csharp/src/Drivers/Databricks/Reader/DatabricksReader.cs
@@ -83,6 +83,13 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader
                     }
                     // TODO: use an expiring cancellationtoken
                     TFetchResultsReq request = new 
TFetchResultsReq(this.response.OperationHandle!, TFetchOrientation.FETCH_NEXT, 
this.statement.BatchSize);
+
+                    // Set MaxBytes from DatabricksStatement
+                    if (this.statement is DatabricksStatement 
databricksStatement)
+                    {
+                        request.MaxBytes = 
databricksStatement.MaxBytesPerFetchRequest;
+                    }
+
                     TFetchResultsResp response = await 
this.statement.Connection.Client!.FetchResults(request, cancellationToken);
 
                     // Make sure we get the arrowBatches
diff --git a/csharp/src/Drivers/Databricks/readme.md 
b/csharp/src/Drivers/Databricks/readme.md
index cb8536228..9159a7521 100644
--- a/csharp/src/Drivers/Databricks/readme.md
+++ b/csharp/src/Drivers/Databricks/readme.md
@@ -103,7 +103,7 @@ CloudFetch is Databricks' high-performance result retrieval 
system that download
 | :--- | :--- | :--- |
 | `adbc.databricks.cloudfetch.enabled` | Whether to use CloudFetch for 
retrieving results | `true` |
 | `adbc.databricks.cloudfetch.lz4.enabled` | Whether the client can decompress 
LZ4 compressed results | `true` |
-| `adbc.databricks.cloudfetch.max_bytes_per_file` | Maximum bytes per file for 
CloudFetch (e.g., `20971520` for 20MB) | `20971520` |
+| `adbc.databricks.cloudfetch.max_bytes_per_file` | Maximum bytes per file for 
CloudFetch. Supports unit suffixes (B, KB, MB, GB). Examples: `20MB`, `1024KB`, 
`20971520` | `20MB` |
 | `adbc.databricks.cloudfetch.parallel_downloads` | Maximum number of parallel 
downloads | `3` |
 | `adbc.databricks.cloudfetch.prefetch_count` | Number of files to prefetch | 
`2` |
 | `adbc.databricks.cloudfetch.memory_buffer_size_mb` | Maximum memory buffer 
size in MB for prefetched files | `200` |
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs 
b/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
index 0df7c22c7..9488e0d9f 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksConnectionTest.cs
@@ -56,6 +56,62 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
             OutputHelper?.WriteLine(exception.Message);
         }
 
+        /// <summary>
+        /// Validates that MaxBytesPerFetchRequest parameter accepts valid 
values with unit suffixes.
+        /// </summary>
+        [SkippableTheory]
+        [InlineData("300MB", 300L * 1024L * 1024L)]
+        [InlineData("1GB", 1024L * 1024L * 1024L)]
+        [InlineData("512KB", 512L * 1024L)]
+        [InlineData("1024B", 1024L)]
+        [InlineData("1024", 1024L)]
+        [InlineData("0", 0L)]
+        internal void CanParseMaxBytesPerFetchRequestParameter(string 
parameterValue, long expectedBytes)
+        {
+            var testConfig = 
(DatabricksTestConfiguration)TestConfiguration.Clone();
+            var parameters = TestEnvironment.GetDriverParameters(testConfig);
+            parameters[DatabricksParameters.MaxBytesPerFetchRequest] = 
parameterValue;
+
+            AdbcDriver driver = NewDriver;
+            AdbcDatabase database = driver.Open(parameters);
+
+            // This should not throw an exception
+            using var connection = database.Connect(parameters);
+
+            // Verify the parameter was parsed correctly by creating a 
statement and checking the property
+            using var statement = connection.CreateStatement();
+            if (statement is DatabricksStatement databricksStatement)
+            {
+                Assert.Equal(expectedBytes, 
databricksStatement.MaxBytesPerFetchRequest);
+            }
+        }
+
+        /// <summary>
+        /// Validates that MaxBytesPerFetchRequest parameter can be set via 
test configuration.
+        /// </summary>
+        [SkippableTheory]
+        [InlineData("500MB", 500L * 1024L * 1024L)]
+        [InlineData("2GB", 2L * 1024L * 1024L * 1024L)]
+        internal void CanSetMaxBytesPerFetchRequestViaTestConfiguration(string 
configValue, long expectedBytes)
+        {
+            var testConfig = 
(DatabricksTestConfiguration)TestConfiguration.Clone();
+            testConfig.MaxBytesPerFetchRequest = configValue;
+            var parameters = TestEnvironment.GetDriverParameters(testConfig);
+
+            AdbcDriver driver = NewDriver;
+            AdbcDatabase database = driver.Open(parameters);
+
+            // This should not throw an exception
+            using var connection = database.Connect(parameters);
+
+            // Verify the parameter was set correctly via test configuration
+            using var statement = connection.CreateStatement();
+            if (statement is DatabricksStatement databricksStatement)
+            {
+                Assert.Equal(expectedBytes, 
databricksStatement.MaxBytesPerFetchRequest);
+            }
+        }
+
         /// <summary>
         /// Tests connection timeout to establish a session with the backend.
         /// </summary>
@@ -304,6 +360,9 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
                 Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.CanDecompressLz4] = "notabool"}, 
typeof(ArgumentException)));
                 Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.MaxBytesPerFile] = "notanumber" }, 
typeof(ArgumentException)));
                 Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.MaxBytesPerFile] = "-100" }, 
typeof(ArgumentOutOfRangeException)));
+                Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.MaxBytesPerFetchRequest] = "notanumber" }, 
typeof(ArgumentException)));
+                Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.MaxBytesPerFetchRequest] = "-100" }, 
typeof(ArgumentOutOfRangeException)));
+                Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.MaxBytesPerFetchRequest] = "invalid_unit" }, 
typeof(ArgumentException)));
                 Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.EnableDirectResults] = "notabool" }, 
typeof(ArgumentException)));
                 Add(new(new() { /*[SparkParameters.Type] = 
SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] 
= "-1" }, typeof(ArgumentOutOfRangeException)));
                 Add(new(new() { /*[SparkParameters.Type] = 
SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] 
= IPEndPoint.MinPort.ToString(CultureInfo.InvariantCulture) }, 
typeof(ArgumentOutOfRangeException)));
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs 
b/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
index baf323484..e3b6efbb5 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksTestConfiguration.cs
@@ -63,5 +63,8 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
 
         [JsonPropertyName("enableDirectResults"), JsonIgnore(Condition = 
JsonIgnoreCondition.WhenWritingDefault)]
         public string EnableDirectResults { get; set; } = string.Empty;
+
+        [JsonPropertyName("maxBytesPerFetchRequest"), JsonIgnore(Condition = 
JsonIgnoreCondition.WhenWritingDefault)]
+        public string MaxBytesPerFetchRequest { get; set; } = string.Empty;
     }
 }
diff --git a/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs 
b/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
index a553856c5..3b2b53efb 100644
--- a/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
+++ b/csharp/test/Drivers/Databricks/E2E/DatabricksTestEnvironment.cs
@@ -173,6 +173,10 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
             {
                 parameters.Add(DatabricksParameters.EnableDirectResults, 
testConfiguration.EnableDirectResults!);
             }
+            if 
(!string.IsNullOrEmpty(testConfiguration.MaxBytesPerFetchRequest))
+            {
+                parameters.Add(DatabricksParameters.MaxBytesPerFetchRequest, 
testConfiguration.MaxBytesPerFetchRequest!);
+            }
             if (testConfiguration.HttpOptions != null)
             {
                 if (testConfiguration.HttpOptions.Tls != null)

Reply via email to