This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new db253b1ef feat(csharp/src/Drivers/Databricks): Default catalogs edge
cases (#2896)
db253b1ef is described below
commit db253b1ef05a22ba1bb0ef247384b857ee4f06af
Author: Todd Meng <[email protected]>
AuthorDate: Tue Jun 3 09:53:03 2025 -0700
feat(csharp/src/Drivers/Databricks): Default catalogs edge cases (#2896)
This PR adds support for edge cases that the ODBC driver supported. It
also allows users to specify a default schema without a default catalog
(relying on the default catalog set by backend server)
### Default schema fallback
If spark protocol version is too low, initial namespace will not be
respected. This PR sets `USE <schema>` as a backup immediately after
`TOpenSessionResp`. `SET CATALOG <catalog>` is not implemented since it
is introduced at the same time as InitialNamespace.
Additionally, schema can now be provided in the OpenSessionReq without
Catalog.
This is relevant for setting default schemas for dbr < 10.4. It means
that we can now provide a default schema for pre-unity-catalog
#### Testing
To test this add to DATABRICKS_TEST_CONFIG_FILE:
```
// no catalog
"db_schema": "default_schema",
```
And use a cluster running dbr < 10.4, run
`OlderDBRVersion_ShouldSetSchemaViaUseStatement`
### Default catalog legacy compatibility
If default catalog or default schema is provided in the
`TOpenSessionResp`, we want subsequent getTables calls to use the
default namespace. This enables powerbi to act as if it is pre-UC, since
it will automatically make all getTables requests with default catalog,
which is often `hive_metastore`.
If no schema is provided in OpenSessionReq, only default catalog will be
returned. This means whether or not metadata queries have a default
schema provided is dependent on if "db_schema" is provided
So, in powerbi, when only a default schema is provided but not a default
catalog, it will automatically restrict itself to the default catalog.
#### Testing
Added `MetadataQuery_ShouldUseResponseNamespace`
---
.../Drivers/Apache/Hive2/HiveServer2Connection.cs | 33 +++++----
.../src/Drivers/Databricks/DatabricksConnection.cs | 34 +++++++--
.../src/Drivers/Databricks/DatabricksStatement.cs | 14 ++++
csharp/src/Drivers/Databricks/readme.md | 3 +-
csharp/test/Drivers/Databricks/StatementTests.cs | 81 ++++++++++++++++++++++
5 files changed, 145 insertions(+), 20 deletions(-)
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
index f101c2256..672ea6682 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
@@ -312,20 +312,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
TOpenSessionReq request = CreateSessionRequest();
TOpenSessionResp? session = await Client.OpenSession(request,
cancellationToken);
-
- // Explicitly check the session status
- if (session == null)
- {
- throw new HiveServer2Exception("Unable to open session.
Unknown error.");
- }
- else if (session.Status.StatusCode !=
TStatusCode.SUCCESS_STATUS)
- {
- throw new HiveServer2Exception(session.Status.ErrorMessage)
- .SetNativeError(session.Status.ErrorCode)
- .SetSqlState(session.Status.SqlState);
- }
-
- SessionHandle = session.SessionHandle;
+ await HandleOpenSessionResponse(session);
}
catch (Exception ex) when
(ExceptionHelper.IsOperationCanceledOrCancellationRequested(ex,
cancellationToken))
{
@@ -338,6 +325,24 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
}
}
+ protected virtual Task HandleOpenSessionResponse(TOpenSessionResp?
session)
+ {
+ // Explicitly check the session status
+ if (session == null)
+ {
+ throw new HiveServer2Exception("Unable to open session.
Unknown error.");
+ }
+ else if (session.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+ {
+ throw new HiveServer2Exception(session.Status.ErrorMessage)
+ .SetNativeError(session.Status.ErrorCode)
+ .SetSqlState(session.Status.SqlState);
+ }
+
+ SessionHandle = session.SessionHandle;
+ return Task.CompletedTask;
+ }
+
protected virtual TCLIService.IAsync CreateTCLIServiceClient(TProtocol
protocol)
{
return new TCLIService.Client(protocol);
diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
index 1282d0c8a..adfc84fbc 100644
--- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs
@@ -168,7 +168,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
Properties.TryGetValue(AdbcOptions.Connection.CurrentCatalog, out
defaultCatalog);
Properties.TryGetValue(AdbcOptions.Connection.CurrentDbSchema, out
defaultSchema);
- if (!string.IsNullOrWhiteSpace(defaultCatalog))
+ if (!string.IsNullOrWhiteSpace(defaultCatalog) ||
!string.IsNullOrWhiteSpace(defaultSchema))
{
_defaultNamespace = new TNamespace
{
@@ -176,10 +176,6 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
SchemaName = defaultSchema
};
}
- else if (!string.IsNullOrEmpty(defaultSchema))
- {
- throw new ArgumentException($"Parameter
'{AdbcOptions.Connection.CurrentCatalog}' is not set but
'{AdbcOptions.Connection.CurrentDbSchema}' is set. Please provide a value for
'{AdbcOptions.Connection.CurrentCatalog}'.");
- }
}
/// <summary>
@@ -374,6 +370,34 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
return req;
}
+ protected override async Task
HandleOpenSessionResponse(TOpenSessionResp? session)
+ {
+ await base.HandleOpenSessionResponse(session);
+ if (session != null)
+ {
+ _enableMultipleCatalogSupport =
session.__isset.canUseMultipleCatalogs ? session.CanUseMultipleCatalogs : false;
+ if (session.__isset.initialNamespace)
+ {
+ _defaultNamespace = session.InitialNamespace;
+ }
+ else if (_defaultNamespace != null &&
!string.IsNullOrEmpty(_defaultNamespace.SchemaName))
+ {
+ // server version is too old. Explicitly set the schema
using queries
+ await SetSchema(_defaultNamespace.SchemaName);
+ }
+ // catalog in namespace is introduced when SET CATALOG is
introduced, so we don't need to fallback
+ }
+ }
+
+ // Since Databricks Namespace was introduced in newer versions, we
fallback to USE SCHEMA to set default schema
+ // in case the server version is too low.
+ private async Task SetSchema(string schemaName)
+ {
+ using var statement = new DatabricksStatement(this);
+ statement.SqlQuery = $"USE {schemaName}";
+ await statement.ExecuteUpdateAsync();
+ }
+
/// <summary>
/// Gets a dictionary of server-side properties extracted from
connection properties.
/// </summary>
diff --git a/csharp/src/Drivers/Databricks/DatabricksStatement.cs
b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
index b1f054e44..1dc0f7a50 100644
--- a/csharp/src/Drivers/Databricks/DatabricksStatement.cs
+++ b/csharp/src/Drivers/Databricks/DatabricksStatement.cs
@@ -41,6 +41,20 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
public DatabricksStatement(DatabricksConnection connection)
: base(connection)
{
+ // set the catalog name for legacy compatibility
+ // TODO: use catalog and schema fields in hiveserver2 connection
instad of DefaultNamespace so we don't need to cast
+ var defaultNamespace =
((DatabricksConnection)Connection).DefaultNamespace;
+ if (defaultNamespace != null)
+ {
+ if (CatalogName == null)
+ {
+ CatalogName = defaultNamespace.CatalogName;
+ }
+ if (SchemaName == null)
+ {
+ SchemaName = defaultNamespace.SchemaName;
+ }
+ }
// Inherit CloudFetch settings from connection
useCloudFetch = connection.UseCloudFetch;
canDecompressLz4 = connection.CanDecompressLz4;
diff --git a/csharp/src/Drivers/Databricks/readme.md
b/csharp/src/Drivers/Databricks/readme.md
index d0d1c2f49..af6ecd901 100644
--- a/csharp/src/Drivers/Databricks/readme.md
+++ b/csharp/src/Drivers/Databricks/readme.md
@@ -38,7 +38,8 @@ The Databricks ADBC driver supports the following
authentication methods:
Basic (username and password) authentication is not supported at this time.
-Optional default catalog and default schema can be set for the session with
`adbc.connection.catalog` and `adbc.connection.db_schema` (catalog must be set
if default schema is provided).
+Optional default catalog and default schema can be set for the session with
`adbc.connection.catalog` and `adbc.connection.db_schema`.
+The default catalog and schema will be used for subsequent metadata calls
unless user specified different catalog/schema to use.
## Data Types
diff --git a/csharp/test/Drivers/Databricks/StatementTests.cs
b/csharp/test/Drivers/Databricks/StatementTests.cs
index a20ca2ae3..cd5254a60 100644
--- a/csharp/test/Drivers/Databricks/StatementTests.cs
+++ b/csharp/test/Drivers/Databricks/StatementTests.cs
@@ -751,5 +751,86 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
Assert.True(expectedType.Equals(field.DataType), $"Field type
mismatch: expected {expectedType}, got {field.DataType}");
Assert.True(expectedNullable == field.IsNullable, $"Field
nullability mismatch: expected {expectedNullable}, got {field.IsNullable}");
}
+
+ [SkippableFact]
+ public async Task MetadataQuery_ShouldUseResponseNamespace()
+ {
+ // Test case: GetTables should use the response namespace. You can
run this test with and without a catalog or schema.
+ using var connection = NewConnection();
+ using var statement = connection.CreateStatement();
+
+ // Set only schema name without catalog
+ statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+ statement.SqlQuery = "GetTables";
+
+ var result = await statement.ExecuteQueryAsync();
+ Assert.NotNull(result.Stream);
+
+ // Verify we get results and they use the response namespace
+ int rowCount = 0;
+ var foundCatalogs = new HashSet<string>();
+ var foundSchemas = new HashSet<string>();
+
+ while (result.Stream != null)
+ {
+ using var batch = await
result.Stream.ReadNextRecordBatchAsync();
+ if (batch == null)
+ break;
+
+ rowCount += batch.Length;
+ var catalogArray = (StringArray)batch.Column(0);
+ var schemaArray = (StringArray)batch.Column(1);
+
+ for (int i = 0; i < batch.Length; i++)
+ {
+ foundCatalogs.Add(catalogArray.GetString(i) ??
string.Empty);
+ foundSchemas.Add(schemaArray.GetString(i) ?? string.Empty);
+ }
+ }
+
+ // Should have results and they should match the schema we
specified
+ Assert.True(rowCount > 0, "Should have results even without
catalog specified");
+ Assert.True(foundSchemas.Count >= 1, "Should have at least one
schema");
+ Assert.True(foundCatalogs.Count == 1, "Should have exactly one
catalog");
+ }
+
+ // run this test with dbr < 10.4
+ [SkippableFact]
+ public async Task OlderDBRVersion_ShouldSetSchemaViaUseStatement()
+ {
+ // Test case: Older DBR version should still set schema via USE
statement.
+ // skip if no schema is provided by user
+ Skip.If(string.IsNullOrEmpty(TestConfiguration.DbSchema), "No
schema provided by user");
+ using var connection = NewConnection();
+ // Verify current schema matches what we expect
+ using var statement = connection.CreateStatement();
+ statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+ statement.SqlQuery = "GetTables";
+
+ var result = await statement.ExecuteQueryAsync();
+ Assert.NotNull(result.Stream);
+
+ // Verify we get results and they use the response namespace
+ int rowCount = 0;
+ var foundSchemas = new HashSet<string>();
+
+ while (result.Stream != null)
+ {
+ using var batch = await
result.Stream.ReadNextRecordBatchAsync();
+ if (batch == null)
+ break;
+
+ rowCount += batch.Length;
+ var schemaArray = (StringArray)batch.Column(0);
+
+ for (int i = 0; i < batch.Length; i++)
+ {
+ foundSchemas.Add(schemaArray.GetString(i) ?? string.Empty);
+ }
+ }
+
+ Assert.True(rowCount > 0, "Should have results even without
catalog specified");
+ Assert.True(foundSchemas.Count == 1, "Should have exactly one
schema");
+ }
}
}