This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new a3a39d80e feat(csharp/src/Drivers/Databricks): Make Cloud Fetch 
options configurable at the connection level (#2691)
a3a39d80e is described below

commit a3a39d80e62fc22d68b24964a3c8d5a036edba02
Author: Alex Guo <[email protected]>
AuthorDate: Mon Apr 21 09:30:22 2025 -0700

    feat(csharp/src/Drivers/Databricks): Make Cloud Fetch options configurable 
at the connection level (#2691)
    
    - Currently, cloud fetch enablement is only configurable at the
    statement level, and this is not exposed to clients outside of
    arrow-adbc repo
    - Make these options configurable at the driver/connection level
    
    Tested E2E by adding a log for the result set format and whether lz4
    compression is enabled
    
    - `{ "adbc.databricks.cloudfetch.enabled", "false" }`
    - `DatabricksConnection.NewReader: resultFormat=ARROW_BASED_SET,
    isLz4Compressed=True`
    
    - `{ "adbc.databricks.cloudfetch.enabled", "false" },
            { "adbc.databricks.cloudfetch.lz4.enabled", "false" },`
    - `DatabricksConnection.NewReader: resultFormat=ARROW_BASED_SET,
    isLz4Compressed=False`
    - empty
    - `DatabricksConnection.NewReader: resultFormat=URL_BASED_SET,
    isLz4Compressed=True`
    - `{ "adbc.databricks.cloudfetch.lz4.enabled", "false" },`
    - `DatabricksConnection.NewReader: resultFormat=URL_BASED_SET,
    isLz4Compressed=False`
    
    Also tested with
    `dotnet test --filter "FullyQualifiedName~CloudFetchE2ETest"`
---
 .../src/Drivers/Databricks/DatabricksConnection.cs | 81 +++++++++++++++++++++-
 .../src/Drivers/Databricks/DatabricksParameters.cs | 18 +++++
 .../src/Drivers/Databricks/DatabricksStatement.cs  | 32 +++------
 .../test/Drivers/Databricks/CloudFetchE2ETest.cs   | 20 ++++--
 .../Drivers/Databricks/DatabricksConnectionTest.cs |  5 ++
 csharp/test/Drivers/Databricks/StatementTests.cs   |  2 +-
 6 files changed, 128 insertions(+), 30 deletions(-)

diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs 
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index a12f9bde9..45a496b64 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -33,12 +33,72 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
     {
         private bool _applySSPWithQueries = false;
 
+        // CloudFetch configuration
+        private const long DefaultMaxBytesPerFile = 20 * 1024 * 1024; // 20MB
+
+        private bool _useCloudFetch = true;
+        private bool _canDecompressLz4 = true;
+        private long _maxBytesPerFile = DefaultMaxBytesPerFile;
+
         public DatabricksConnection(IReadOnlyDictionary<string, string> 
properties) : base(properties)
         {
-            if 
(Properties.TryGetValue(DatabricksParameters.ApplySSPWithQueries, out string? 
applySSPWithQueriesStr) &&
-                bool.TryParse(applySSPWithQueriesStr, out bool 
applySSPWithQueriesValue))
+            ValidateProperties();
+        }
+
+        private void ValidateProperties()
+        {
+            if 
(Properties.TryGetValue(DatabricksParameters.ApplySSPWithQueries, out string? 
applySSPWithQueriesStr))
+            {
+                if (bool.TryParse(applySSPWithQueriesStr, out bool 
applySSPWithQueriesValue))
+                {
+                    _applySSPWithQueries = applySSPWithQueriesValue;
+                }
+                else
+                {
+                    throw new ArgumentException($"Parameter 
'{DatabricksParameters.ApplySSPWithQueries}' value '{applySSPWithQueriesStr}' 
could not be parsed. Valid values are 'true' and 'false'.");
+                }
+            }
+
+            // Parse CloudFetch options from connection properties
+            if (Properties.TryGetValue(DatabricksParameters.UseCloudFetch, out 
string? useCloudFetchStr))
+            {
+                if (bool.TryParse(useCloudFetchStr, out bool 
useCloudFetchValue))
+                {
+                    _useCloudFetch = useCloudFetchValue;
+                }
+                else
+                {
+                    throw new ArgumentException($"Parameter 
'{DatabricksParameters.UseCloudFetch}' value '{useCloudFetchStr}' could not be 
parsed. Valid values are 'true' and 'false'.");
+                }
+            }
+
+            if (Properties.TryGetValue(DatabricksParameters.CanDecompressLz4, 
out string? canDecompressLz4Str))
+            {
+                if (bool.TryParse(canDecompressLz4Str, out bool 
canDecompressLz4Value))
+                {
+                    _canDecompressLz4 = canDecompressLz4Value;
+                }
+                else
+                {
+                    throw new ArgumentException($"Parameter 
'{DatabricksParameters.CanDecompressLz4}' value '{canDecompressLz4Str}' could 
not be parsed. Valid values are 'true' and 'false'.");
+                }
+            }
+
+            if (Properties.TryGetValue(DatabricksParameters.MaxBytesPerFile, 
out string? maxBytesPerFileStr))
             {
-                _applySSPWithQueries = applySSPWithQueriesValue;
+                if (!long.TryParse(maxBytesPerFileStr, out long 
maxBytesPerFileValue))
+                {
+                    throw new ArgumentException($"Parameter 
'{DatabricksParameters.MaxBytesPerFile}' value '{maxBytesPerFileStr}' could not 
be parsed. Valid values are positive integers.");
+                }
+
+                if (maxBytesPerFileValue <= 0)
+                {
+                    throw new ArgumentOutOfRangeException(
+                        nameof(Properties),
+                        maxBytesPerFileValue,
+                        $"Parameter '{DatabricksParameters.MaxBytesPerFile}' 
value must be a positive integer.");
+                }
+                _maxBytesPerFile = maxBytesPerFileValue;
             }
         }
 
@@ -47,6 +107,21 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// </summary>
         internal bool ApplySSPWithQueries => _applySSPWithQueries;
 
+        /// <summary>
+        /// Gets whether CloudFetch is enabled.
+        /// </summary>
+        internal bool UseCloudFetch => _useCloudFetch;
+
+        /// <summary>
+        /// Gets whether LZ4 decompression is enabled.
+        /// </summary>
+        internal bool CanDecompressLz4 => _canDecompressLz4;
+
+        /// <summary>
+        /// Gets the maximum bytes per file for CloudFetch.
+        /// </summary>
+        internal long MaxBytesPerFile => _maxBytesPerFile;
+
         internal override IArrowArrayStream NewReader<T>(T statement, Schema 
schema, TGetResultSetMetadataResp? metadataResp = null)
         {
             // Get result format from metadata response if available
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs 
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index 9d06a9479..1c963b4d7 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -25,6 +25,24 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
     public class DatabricksParameters : SparkParameters
     {
         // CloudFetch configuration parameters
+        /// <summary>
+        /// Whether to use CloudFetch for retrieving results.
+        /// Default value is true if not specified.
+        /// </summary>
+        public const string UseCloudFetch = 
"adbc.databricks.cloudfetch.enabled";
+
+        /// <summary>
+        /// Whether the client can decompress LZ4 compressed results.
+        /// Default value is true if not specified.
+        /// </summary>
+        public const string CanDecompressLz4 = 
"adbc.databricks.cloudfetch.lz4.enabled";
+
+        /// <summary>
+        /// Maximum bytes per file for CloudFetch.
+        /// Default value is 20MB if not specified.
+        /// </summary>
+        public const string MaxBytesPerFile = 
"adbc.databricks.cloudfetch.max_bytes_per_file";
+
         /// <summary>
         /// Maximum number of retry attempts for CloudFetch downloads.
         /// Default value is 3 if not specified.
diff --git a/csharp/src/Drivers/Databricks/DatabricksStatement.cs 
b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
index 62f576eb9..f2c9e643a 100644
--- a/csharp/src/Drivers/Databricks/DatabricksStatement.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
@@ -27,18 +27,17 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
     /// </summary>
     internal class DatabricksStatement : SparkStatement
     {
-        // Default maximum bytes per file for CloudFetch
-        private const long DefaultMaxBytesPerFile = 20 * 1024 * 1024; // 20MB
-
-        // CloudFetch configuration
-        private bool useCloudFetch = true;
-        private bool canDecompressLz4 = true;
-        private long maxBytesPerFile = DefaultMaxBytesPerFile;
+        private bool useCloudFetch;
+        private bool canDecompressLz4;
+        private long maxBytesPerFile;
 
         public DatabricksStatement(DatabricksConnection connection)
             : base(connection)
         {
-
+            // Inherit CloudFetch settings from connection
+            useCloudFetch = connection.UseCloudFetch;
+            canDecompressLz4 = connection.CanDecompressLz4;
+            maxBytesPerFile = connection.MaxBytesPerFile;
         }
 
         protected override void SetStatementProperties(TExecuteStatementReq 
statement)
@@ -55,7 +54,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         {
             switch (key)
             {
-                case Options.UseCloudFetch:
+                case DatabricksParameters.UseCloudFetch:
                     if (bool.TryParse(value, out bool useCloudFetchValue))
                     {
                         this.useCloudFetch = useCloudFetchValue;
@@ -65,7 +64,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                         throw new ArgumentException($"Invalid value for {key}: 
{value}. Expected a boolean value.");
                     }
                     break;
-                case Options.CanDecompressLz4:
+                case DatabricksParameters.CanDecompressLz4:
                     if (bool.TryParse(value, out bool canDecompressLz4Value))
                     {
                         this.canDecompressLz4 = canDecompressLz4Value;
@@ -75,7 +74,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
                         throw new ArgumentException($"Invalid value for {key}: 
{value}. Expected a boolean value.");
                     }
                     break;
-                case Options.MaxBytesPerFile:
+                case DatabricksParameters.MaxBytesPerFile:
                     if (long.TryParse(value, out long maxBytesPerFileValue))
                     {
                         this.maxBytesPerFile = maxBytesPerFileValue;
@@ -132,16 +131,5 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         {
             this.maxBytesPerFile = maxBytesPerFile;
         }
-
-        /// <summary>
-        /// Provides the constant string key values to the <see 
cref="AdbcStatement.SetOption(string, string)" /> method.
-        /// </summary>
-        public sealed class Options : ApacheParameters
-        {
-            // CloudFetch options
-            public const string UseCloudFetch = 
"adbc.databricks.cloudfetch.enabled";
-            public const string CanDecompressLz4 = 
"adbc.databricks.cloudfetch.lz4.enabled";
-            public const string MaxBytesPerFile = 
"adbc.databricks.cloudfetch.max_bytes_per_file";
-        }
     }
 }
diff --git a/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs 
b/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
index e57a55ec2..0d9bbfa90 100644
--- a/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
+++ b/csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
@@ -50,13 +50,25 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
             await TestRealDatabricksCloudFetchLargeQuery("SELECT * FROM 
main.tpcds_sf10_delta.catalog_sales LIMIT 1000000", 1000000);
         }
 
-        private async Task TestRealDatabricksCloudFetchLargeQuery(string 
query, int rowCount)
+        [Fact]
+        public async Task TestRealDatabricksNoCloudFetchSmallResultSet()
+        {
+            await TestRealDatabricksCloudFetchLargeQuery("SELECT * FROM 
range(1000)", 1000, false);
+        }
+
+        [Fact]
+        public async Task TestRealDatabricksNoCloudFetchLargeResultSet()
+        {
+            await TestRealDatabricksCloudFetchLargeQuery("SELECT * FROM 
main.tpcds_sf10_delta.catalog_sales LIMIT 1000000", 1000000, false);
+        }
+
+        private async Task TestRealDatabricksCloudFetchLargeQuery(string 
query, int rowCount, bool useCloudFetch = true)
         {
             // Create a statement with CloudFetch enabled
             var statement = Connection.CreateStatement();
-            statement.SetOption(DatabricksStatement.Options.UseCloudFetch, 
"true");
-            statement.SetOption(DatabricksStatement.Options.CanDecompressLz4, 
"true");
-            statement.SetOption(DatabricksStatement.Options.MaxBytesPerFile, 
"10485760"); // 10MB
+            statement.SetOption(DatabricksParameters.UseCloudFetch, 
useCloudFetch.ToString());
+            statement.SetOption(DatabricksParameters.CanDecompressLz4, "true");
+            statement.SetOption(DatabricksParameters.MaxBytesPerFile, 
"10485760"); // 10MB
 
             // Execute a query that generates a large result set using range 
function
             statement.SqlQuery = query;
diff --git a/csharp/test/Drivers/Databricks/DatabricksConnectionTest.cs 
b/csharp/test/Drivers/Databricks/DatabricksConnectionTest.cs
index a5667de53..3462f1505 100644
--- a/csharp/test/Drivers/Databricks/DatabricksConnectionTest.cs
+++ b/csharp/test/Drivers/Databricks/DatabricksConnectionTest.cs
@@ -22,6 +22,7 @@ using System.Net;
 using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
 using Apache.Arrow.Adbc.Drivers.Apache.Spark;
+using Apache.Arrow.Adbc.Drivers.Databricks;
 using Thrift.Transport;
 using Xunit;
 using Xunit.Abstractions;
@@ -295,6 +296,10 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
                 Add(new([], typeof(ArgumentException)));
                 Add(new(new() { [SparkParameters.Type] = " " }, 
typeof(ArgumentException)));
                 Add(new(new() { [SparkParameters.Type] = "xxx" }, 
typeof(ArgumentException)));
+                Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.UseCloudFetch] = "notabool" }, 
typeof(ArgumentException)));
+                Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.CanDecompressLz4] = "notabool"}, 
typeof(ArgumentException)));
+                Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.MaxBytesPerFile] = "notanumber" }, 
typeof(ArgumentException)));
+                Add(new(new() { [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", 
[DatabricksParameters.MaxBytesPerFile] = "-100" }, 
typeof(ArgumentOutOfRangeException)));
                 Add(new(new() { /*[SparkParameters.Type] = 
SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] 
= "-1" }, typeof(ArgumentOutOfRangeException)));
                 Add(new(new() { /*[SparkParameters.Type] = 
SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] 
= IPEndPoint.MinPort.ToString(CultureInfo.InvariantCulture) }, 
typeof(ArgumentOutOfRangeException)));
                 Add(new(new() { /*[SparkParameters.Type] = 
SparkServerTypeConstants.Databricks,*/ [SparkParameters.HostName] = 
"valid.server.com", [SparkParameters.Token] = "abcdef", [SparkParameters.Port] 
= (IPEndPoint.MaxPort + 1).ToString(CultureInfo.InvariantCulture) }, 
typeof(ArgumentOutOfRangeException)));
diff --git a/csharp/test/Drivers/Databricks/StatementTests.cs 
b/csharp/test/Drivers/Databricks/StatementTests.cs
index 8f06c63c8..16bc67bd4 100644
--- a/csharp/test/Drivers/Databricks/StatementTests.cs
+++ b/csharp/test/Drivers/Databricks/StatementTests.cs
@@ -44,7 +44,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
             using var statement = connection.CreateStatement();
 
             // Set options for LZ4 decompression (enabled by default) and 
CloudFetch as specified
-            statement.SetOption(DatabricksStatement.Options.UseCloudFetch, 
useCloudFetch.ToString().ToLower());
+            statement.SetOption(DatabricksParameters.UseCloudFetch, 
useCloudFetch.ToString().ToLower());
             OutputHelper?.WriteLine($"CloudFetch is {(useCloudFetch ? 
"enabled" : "disabled")}");
             OutputHelper?.WriteLine("LZ4 decompression capability is enabled 
by default");
 

Reply via email to