This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new aae84d22c feat(csharp/src/Drivers/Databricks): Primary Key and Foreign 
Key Metadata Optimization (#2886)
aae84d22c is described below

commit aae84d22c1f74601ae81cf6b38455bbacfb7e65e
Author: eric-wang-1990 <115501094+eric-wang-1...@users.noreply.github.com>
AuthorDate: Wed May 28 10:27:39 2025 -0700

    feat(csharp/src/Drivers/Databricks): Primary Key and Foreign Key Metadata 
Optimization (#2886)
    
    ## Arrow ADBC: Primary Key and Foreign Key Metadata Optimization
    
    ### Description
    
    This PR adds support for optimizing Primary Key and Foreign Key metadata
    queries in the C# Databricks ADBC driver. It introduces a new connection
    parameter `adbc.databricks.enable_pk_fk` that allows users to control
    whether the driver should make PK/FK metadata calls to the server or
    return empty results for improved performance.
    
    ### Background
    
    Primary Key and Foreign Key metadata queries can be expensive
    operations, particularly in Databricks environments where they may not
    be fully supported in certain catalogs. This implementation provides a
    way to optimize these operations by:
    
    1. Allowing users to disable PK/FK metadata calls entirely via
    configuration
    2. Automatically returning empty results for legacy catalogs (SPARK,
    hive_metastore) where PK/FK metadata is not supported
    3. Ensuring that empty results maintain schema compatibility with real
    metadata responses
    
    ### Proposed Changes
    
    - Add new connection parameter `adbc.databricks.enable_pk_fk` to control
    PK/FK metadata behavior (default: true)
    - Implement special handling for legacy catalogs (SPARK, hive_metastore)
    to return empty results without server calls
    - Modify method visibility in base classes to allow proper overriding in
    derived classes
    - Add comprehensive test coverage for the new functionality
    
    ### How is this tested?
    
    Added unit tests that verify:
    1. The correct behavior of the `ShouldReturnEmptyPkFkResult` method with
    various combinations of settings
    2. Schema compatibility between empty results and real metadata
    responses
    3. Proper handling of different catalog scenarios
    
    These tests ensure that the optimization works correctly while
    maintaining compatibility with client applications that expect
    consistent schema structures.
---
 .../Drivers/Apache/Hive2/HiveServer2Statement.cs   |   4 +-
 .../src/Drivers/Databricks/DatabricksConnection.cs |  18 ++++
 .../src/Drivers/Databricks/DatabricksParameters.cs |   6 ++
 .../src/Drivers/Databricks/DatabricksStatement.cs  | 117 ++++++++++++++++++++-
 csharp/test/Drivers/Databricks/StatementTests.cs   |  98 +++++++++++++++++
 5 files changed, 240 insertions(+), 3 deletions(-)

diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs 
b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
index 4079fce26..c4c7414c2 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
@@ -368,7 +368,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             };
         }
 
-        private async Task<QueryResult> 
GetCrossReferenceAsync(CancellationToken cancellationToken = default)
+        protected virtual async Task<QueryResult> 
GetCrossReferenceAsync(CancellationToken cancellationToken = default)
         {
             TGetCrossReferenceResp resp = await 
Connection.GetCrossReferenceAsync(
                 CatalogName,
@@ -383,7 +383,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             return await GetQueryResult(resp.DirectResults, cancellationToken);
         }
 
-        private async Task<QueryResult> GetPrimaryKeysAsync(CancellationToken 
cancellationToken = default)
+        protected virtual async Task<QueryResult> 
GetPrimaryKeysAsync(CancellationToken cancellationToken = default)
         {
             TGetPrimaryKeysResp resp = await Connection.GetPrimaryKeysAsync(
                 CatalogName,
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs 
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 3cb0e544f..1282d0c8a 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -40,6 +40,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         private bool _applySSPWithQueries = false;
         private bool _enableDirectResults = true;
         private bool _enableMultipleCatalogSupport = true;
+        private bool _enablePKFK = true;
 
         internal static TSparkGetDirectResults defaultGetDirectResults = new()
         {
@@ -71,6 +72,18 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
 
         private void ValidateProperties()
         {
+            if (Properties.TryGetValue(DatabricksParameters.EnablePKFK, out 
string? enablePKFKStr))
+            {
+                if (bool.TryParse(enablePKFKStr, out bool enablePKFKValue))
+                {
+                    _enablePKFK = enablePKFKValue;
+                }
+                else
+                {
+                    throw new ArgumentException($"Parameter 
'{DatabricksParameters.EnablePKFK}' value '{enablePKFKStr}' could not be 
parsed. Valid values are 'true', 'false'.");
+                }
+            }
+
             if 
(Properties.TryGetValue(DatabricksParameters.EnableMultipleCatalogSupport, out 
string? enableMultipleCatalogSupportStr))
             {
                 if (bool.TryParse(enableMultipleCatalogSupportStr, out bool 
enableMultipleCatalogSupportValue))
@@ -204,6 +217,11 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// </summary>
         internal bool EnableMultipleCatalogSupport => 
_enableMultipleCatalogSupport;
 
+        /// <summary>
+        /// Gets whether PK/FK metadata call is enabled
+        /// </summary>
+        public bool EnablePKFK => _enablePKFK;
+
         /// <summary>
         /// Gets a value indicating whether to retry requests that receive a 
503 response with a Retry-After header.
         /// </summary>
diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs 
b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
index 85e33f62d..db62c04b2 100644
--- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs
@@ -155,6 +155,12 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         /// Default value is true if not specified.
         /// </summary>
         public const string EnableMultipleCatalogSupport = 
"adbc.databricks.enable_multiple_catalog_support";
+
+        /// <summary>
+        /// Whether to enable primary key foreign key metadata call.
+        /// Default value is true if not specified.
+        /// </summary>
+        public const string EnablePKFK = "adbc.databricks.enable_pk_fk";
     }
 
     /// <summary>
diff --git a/csharp/src/Drivers/Databricks/DatabricksStatement.cs 
b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
index 1fdeee285..cb92cdd5e 100644
--- a/csharp/src/Drivers/Databricks/DatabricksStatement.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
@@ -16,7 +16,6 @@
 */
 
 using System;
-using System.Collections.Generic;
 using System.Threading;
 using System.Threading.Tasks;
 using Apache.Arrow.Adbc.Drivers.Apache;
@@ -37,6 +36,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
         private bool canDecompressLz4;
         private long maxBytesPerFile;
         private bool enableMultipleCatalogSupport;
+        private bool enablePKFK;
 
         public DatabricksStatement(DatabricksConnection connection)
             : base(connection)
@@ -46,6 +46,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
             canDecompressLz4 = connection.CanDecompressLz4;
             maxBytesPerFile = connection.MaxBytesPerFile;
             enableMultipleCatalogSupport = 
connection.EnableMultipleCatalogSupport;
+            enablePKFK = connection.EnablePKFK;
         }
 
         protected override void SetStatementProperties(TExecuteStatementReq 
statement)
@@ -386,5 +387,119 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
             // Call the base implementation with the potentially modified 
catalog name
             return await base.GetColumnsAsync(cancellationToken);
         }
+
+        /// <summary>
+        /// Determines whether PK/FK metadata queries 
(GetPrimaryKeys/GetCrossReference) should return an empty result set without 
hitting the server.
+        ///
+        /// Why:
+        /// - For certain catalog names (null, empty, "SPARK", 
"hive_metastore"), Databricks does not support PK/FK metadata,
+        ///   or these are legacy/synthesized catalogs that should gracefully 
return empty results for compatibility.
+        /// - The EnablePKFK flag allows the client to globally disable PK/FK 
metadata queries for performance or compatibility reasons.
+        ///
+        /// What it does:
+        /// - Returns true if PK/FK queries should return an empty result (and 
not hit the server), based on:
+        ///   - The EnablePKFK flag (if false, always return empty)
+        ///   - The catalog name (SPARK, hive_metastore, null, or empty string)
+        /// - Returns false if the query should proceed to the server (for 
valid, supported catalogs).
+        /// </summary>
+        internal bool ShouldReturnEmptyPkFkResult()
+        {
+            if (!enablePKFK)
+                return true;
+
+            // Handle special catalog cases
+            if (string.IsNullOrEmpty(CatalogName) ||
+                string.Equals(CatalogName, "SPARK", 
StringComparison.OrdinalIgnoreCase) ||
+                string.Equals(CatalogName, "hive_metastore", 
StringComparison.OrdinalIgnoreCase))
+            {
+                return true;
+            }
+
+            return false;
+        }
+
+        protected override async Task<QueryResult> 
GetPrimaryKeysAsync(CancellationToken cancellationToken = default)
+        {
+            if (ShouldReturnEmptyPkFkResult())
+                return EmptyPrimaryKeysResult();
+
+            return await base.GetPrimaryKeysAsync(cancellationToken);
+        }
+
+        private QueryResult EmptyPrimaryKeysResult()
+        {
+            var fields = new[]
+            {
+                new Field("TABLE_CAT", StringType.Default, true),
+                new Field("TABLE_SCHEM", StringType.Default, true),
+                new Field("TABLE_NAME", StringType.Default, true),
+                new Field("COLUMN_NAME", StringType.Default, true),
+                new Field("KEQ_SEQ", Int32Type.Default, true),
+                new Field("PK_NAME", StringType.Default, true)
+            };
+            var schema = new Schema(fields, null);
+
+            var arrays = new IArrowArray[]
+            {
+                new StringArray.Builder().Build(), // TABLE_CAT
+                new StringArray.Builder().Build(), // TABLE_SCHEM
+                new StringArray.Builder().Build(), // TABLE_NAME
+                new StringArray.Builder().Build(), // COLUMN_NAME
+                new Int16Array.Builder().Build(),  // KEQ_SEQ
+                new StringArray.Builder().Build()  // PK_NAME
+            };
+
+            return new QueryResult(0, new 
HiveServer2Connection.HiveInfoArrowStream(schema, arrays));
+        }
+
+        protected override async Task<QueryResult> 
GetCrossReferenceAsync(CancellationToken cancellationToken = default)
+        {
+            if (ShouldReturnEmptyPkFkResult())
+                return EmptyCrossReferenceResult();
+
+            return await base.GetCrossReferenceAsync(cancellationToken);
+        }
+
+        private QueryResult EmptyCrossReferenceResult()
+        {
+            var fields = new[]
+            {
+                new Field("PKTABLE_CAT", StringType.Default, true),
+                new Field("PKTABLE_SCHEM", StringType.Default, true),
+                new Field("PKTABLE_NAME", StringType.Default, true),
+                new Field("PKCOLUMN_NAME", StringType.Default, true),
+                new Field("FKTABLE_CAT", StringType.Default, true),
+                new Field("FKTABLE_SCHEM", StringType.Default, true),
+                new Field("FKTABLE_NAME", StringType.Default, true),
+                new Field("FKCOLUMN_NAME", StringType.Default, true),
+                new Field("KEY_SEQ", Int16Type.Default, true),
+                new Field("UPDATE_RULE", Int16Type.Default, true),
+                new Field("DELETE_RULE", Int16Type.Default, true),
+                new Field("FK_NAME", StringType.Default, true),
+                new Field("PK_NAME", StringType.Default, true),
+                new Field("DEFERRABILITY", Int16Type.Default, true)
+            };
+            var schema = new Schema(fields, null);
+
+            var arrays = new IArrowArray[]
+            {
+                new StringArray.Builder().Build(), // PKTABLE_CAT
+                new StringArray.Builder().Build(), // PKTABLE_SCHEM
+                new StringArray.Builder().Build(), // PKTABLE_NAME
+                new StringArray.Builder().Build(), // PKCOLUMN_NAME
+                new StringArray.Builder().Build(), // FKTABLE_CAT
+                new StringArray.Builder().Build(), // FKTABLE_SCHEM
+                new StringArray.Builder().Build(), // FKTABLE_NAME
+                new StringArray.Builder().Build(), // FKCOLUMN_NAME
+                new Int16Array.Builder().Build(),  // KEY_SEQ
+                new Int16Array.Builder().Build(),  // UPDATE_RULE
+                new Int16Array.Builder().Build(),  // DELETE_RULE
+                new StringArray.Builder().Build(), // FK_NAME
+                new StringArray.Builder().Build(), // PK_NAME
+                new Int16Array.Builder().Build()   // DEFERRABILITY
+            };
+
+            return new QueryResult(0, new 
HiveServer2Connection.HiveInfoArrowStream(schema, arrays));
+        }
     }
 }
diff --git a/csharp/test/Drivers/Databricks/StatementTests.cs 
b/csharp/test/Drivers/Databricks/StatementTests.cs
index ef1a84b00..368a5e0d7 100644
--- a/csharp/test/Drivers/Databricks/StatementTests.cs
+++ b/csharp/test/Drivers/Databricks/StatementTests.cs
@@ -652,5 +652,103 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
             Assert.True(expectedType.Equals(field.DataType), $"Field {index} 
type mismatch");
             Assert.True(expectedNullable == field.IsNullable, $"Field {index} 
nullability mismatch");
         }
+
+        [Theory]
+        [InlineData(false, "main", true)]
+        [InlineData(true, null, true)]
+        [InlineData(true, "", true)]
+        [InlineData(true, "SPARK", true)]
+        [InlineData(true, "hive_metastore", true)]
+        [InlineData(true, "main", false)]
+        public void ShouldReturnEmptyPkFkResult_WorksAsExpected(bool 
enablePKFK, string? catalogName, bool expected)
+        {
+            // Arrange: create test configuration and connection
+            var testConfig = 
(DatabricksTestConfiguration)TestConfiguration.Clone();
+            var connectionParams = new Dictionary<string, string>
+            {
+                [DatabricksParameters.EnablePKFK] = 
enablePKFK.ToString().ToLowerInvariant()
+            };
+            using var connection = NewConnection(testConfig, connectionParams);
+            var statement = connection.CreateStatement();
+
+            // Set CatalogName using SetOption
+            if(catalogName != null)
+            {
+                statement.SetOption(ApacheParameters.CatalogName, catalogName);
+            }
+
+            // Act
+            var result = 
((DatabricksStatement)statement).ShouldReturnEmptyPkFkResult();
+
+            // Assert
+            Assert.Equal(expected, result);
+        }
+
+        [SkippableFact]
+        public async Task PKFK_EmptyResult_SchemaMatches_RealMetadataResponse()
+        {
+            // Arrange: create test configuration and connection
+            var testConfig = 
(DatabricksTestConfiguration)TestConfiguration.Clone();
+            var connectionParams = new Dictionary<string, string>
+            {
+                [DatabricksParameters.EnablePKFK] = "true"
+            };
+            using var connection = NewConnection(testConfig, connectionParams);
+            var statement = connection.CreateStatement();
+
+            // Get real PK metadata schema
+            statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+            statement.SetOption(ApacheParameters.CatalogName, "powerbi");
+            statement.SetOption(ApacheParameters.SchemaName, 
TestConfiguration.Metadata.Schema);
+            statement.SetOption(ApacheParameters.TableName, 
TestConfiguration.Metadata.Table);
+            statement.SqlQuery = "GetPrimaryKeys";
+            var realPkResult = await statement.ExecuteQueryAsync();
+            Assert.NotNull(realPkResult.Stream);
+            var realPkSchema = realPkResult.Stream.Schema;
+
+            // Get empty PK result schema (using SPARK catalog which should 
return empty)
+            statement.SetOption(ApacheParameters.CatalogName, "SPARK");
+            var emptyPkResult = await statement.ExecuteQueryAsync();
+            Assert.NotNull(emptyPkResult.Stream);
+            var emptyPkSchema = emptyPkResult.Stream.Schema;
+
+            // Verify PK schemas match
+            Assert.Equal(realPkSchema.FieldsList.Count, 
emptyPkSchema.FieldsList.Count);
+            for (int i = 0; i < realPkSchema.FieldsList.Count; i++)
+            {
+                var realField = realPkSchema.FieldsList[i];
+                var emptyField = emptyPkSchema.FieldsList[i];
+                AssertField(emptyField, realField.Name, realField.DataType, 
realField.IsNullable);
+            }
+
+            // Get real FK metadata schema
+            statement.SetOption(ApacheParameters.CatalogName, 
TestConfiguration.Metadata.Catalog);
+            statement.SqlQuery = "GetCrossReference";
+            var realFkResult = await statement.ExecuteQueryAsync();
+            Assert.NotNull(realFkResult.Stream);
+            var realFkSchema = realFkResult.Stream.Schema;
+
+            // Get empty FK result schema
+            statement.SetOption(ApacheParameters.CatalogName, "SPARK");
+            var emptyFkResult = await statement.ExecuteQueryAsync();
+            Assert.NotNull(emptyFkResult.Stream);
+            var emptyFkSchema = emptyFkResult.Stream.Schema;
+
+            // Verify FK schemas match
+            Assert.Equal(realFkSchema.FieldsList.Count, 
emptyFkSchema.FieldsList.Count);
+            for (int i = 0; i < realFkSchema.FieldsList.Count; i++)
+            {
+                var realField = realFkSchema.FieldsList[i];
+                var emptyField = emptyFkSchema.FieldsList[i];
+                AssertField(emptyField, realField.Name, realField.DataType, 
realField.IsNullable);
+            }
+        }
+
+        private void AssertField(Field field, string expectedName, IArrowType 
expectedType, bool expectedNullable)
+        {
+            Assert.True(expectedName.Equals(field.Name), $"Field name 
mismatch: expected {expectedName}, got {field.Name}");
+            Assert.True(expectedType.Equals(field.DataType), $"Field type 
mismatch: expected {expectedType}, got {field.DataType}");
+            Assert.True(expectedNullable == field.IsNullable, $"Field 
nullability mismatch: expected {expectedNullable}, got {field.IsNullable}");
+        }
     }
 }

Reply via email to