This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new aa1053aac feat(csharp/src/Drivers/Apache): Add support for native 
metadata queries using statement options (#2665)
aa1053aac is described below

commit aa1053aac3b3e8b07d82f2dd1bd61221cbdf0bf2
Author: Bruce Irschick <[email protected]>
AuthorDate: Thu Apr 3 10:18:26 2025 -0700

    feat(csharp/src/Drivers/Apache): Add support for native metadata queries 
using statement options (#2665)
    
    This feature adds support for making native metadata queries to data
    source.
    
    The caller sets statement options to determine the metadata command and
    parameters.
    
    ### Statement Options
    
    | Option | Description | Default |
    | :--- | :--- | :--- |
    | `adbc.apache.statement.is_metadata_command` | Indicate that the value
    of `AdbcStatement.SqlQuery` contains the name of a native metadata
    command. If set to `True`, it indicates a metadata command query whereas
    a value of `False` indicates a SQL command query. <br><br>Supported
    metadata commands include: `GetPrimaryKeys`, `GetCrossReference`,
    `GetCatalogs`, `GetSchemas`, `GetTables`, and `GetColumns`. | `False` |
    | `adbc.apache.catalog_name` | The catalog name (or pattern) when used
    with a metadata command query. <br><br>Supported metadata commands
    include: `GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`,
    `GetTables`, and `GetColumns`. | |
    | `adbc.apache.schema_name` | The schema name (or pattern) when used
    with a metadata command query. <br><br>Supported metadata commands
    include: `GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`,
    `GetTables`, and `GetColumns`. | |
    | `adbc.apache.table_name` | The table name (or pattern) when used with
    a metadata command query. <br><br>Supported metadata commands include:
    `GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`, `GetTables`, and
    `GetColumns`. | |
    | `adbc.apache.table_types` | The comma-separated list of table types
    when used with a metadata command query. <br><br>Supported metadata
    commands include: `GetTables`. | |
    | `adbc.apache.column_name` | The column name (or pattern) when used
    with a metadata command query. <br><br>Supported metadata commands
    include: `GetColumns`. | |
    | `adbc.apache.foreign_catalog_name` | The foreign (i.e., child) catalog
    name (or pattern) when used with a metadata command query.
    <br><br>Supported metadata commands include: `GetCrossReference`. | |
    | `adbc.apache.foreign_schema_name` | The foreign (i.e., child) schema
    name (or pattern) when used with a metadata command query.
    <br><br>Supported metadata commands include: `GetCrossReference`. | |
    | `adbc.apache.foreign_table_name` | The foreign (i.e., child) table
    name (or pattern) when used with a metadata command query.
    <br><br>Supported metadata commands include: `GetCrossReference`. | |
---
 csharp/src/Drivers/Apache/ApacheParameters.cs      |  50 +++
 csharp/src/Drivers/Apache/ApacheUtility.cs         |  12 +
 .../Drivers/Apache/Hive2/HiveServer2Connection.cs  | 255 ++++++++++++++-
 .../Apache/Hive2/HiveServer2HttpConnection.cs      |   4 +
 .../src/Drivers/Apache/Hive2/HiveServer2Reader.cs  |  31 +-
 .../Drivers/Apache/Hive2/HiveServer2Statement.cs   | 180 ++++++++++-
 csharp/src/Drivers/Apache/Hive2/README.md          |   9 +
 .../src/Drivers/Apache/Impala/ImpalaConnection.cs  |   4 +
 .../src/Drivers/Apache/{Hive2 => Impala}/README.md |  37 +--
 csharp/src/Drivers/Apache/Spark/README.md          |   9 +
 csharp/src/Drivers/Apache/Spark/SparkConnection.cs |   6 +-
 .../Apache/Spark/SparkDatabricksConnection.cs      |   5 +-
 .../Drivers/Apache/Spark/SparkHttpConnection.cs    |   4 +
 csharp/test/Apache.Arrow.Adbc.Tests/TestBase.cs    |  13 +-
 .../test/Drivers/Apache/Common/StatementTests.cs   | 342 +++++++++++++++++++++
 csharp/test/Drivers/Apache/Hive2/StatementTests.cs |  21 +-
 .../test/Drivers/Apache/Impala/StatementTests.cs   |  38 ++-
 csharp/test/Drivers/Apache/Spark/StatementTests.cs |  31 ++
 18 files changed, 1011 insertions(+), 40 deletions(-)

diff --git a/csharp/src/Drivers/Apache/ApacheParameters.cs 
b/csharp/src/Drivers/Apache/ApacheParameters.cs
index 17c94be32..4629df98f 100644
--- a/csharp/src/Drivers/Apache/ApacheParameters.cs
+++ b/csharp/src/Drivers/Apache/ApacheParameters.cs
@@ -25,5 +25,55 @@ namespace Apache.Arrow.Adbc.Drivers.Apache
         public const string PollTimeMilliseconds = 
"adbc.apache.statement.polltime_ms";
         public const string BatchSize = "adbc.apache.statement.batch_size";
         public const string QueryTimeoutSeconds = 
"adbc.apache.statement.query_timeout_s";
+
+        /// <summary>
+        /// The indicator of whether the 
<c>AdbcStatement.ExecuteQuery[Async]</c> should execute a metadata command 
query.
+        /// In the case this indicator is set to <c>True</c>, the method will 
execute a metadata command using the native API where
+        /// the name of the command is given in the 
<c>AdbcStatement.SqlQuery</c> property value.
+        /// <para>
+        /// Use the <c>adbc.get_metadata.*</c> options to set the input 
parameters for the native metadata command query.
+        /// </para>
+        /// </summary>
+        public const string IsMetadataCommand = 
"adbc.apache.statement.is_metadata_command";
+
+        /// <summary>
+        /// The catalog name (or pattern) of the table for GetSchemas, Get* 
metadata command queries.
+        /// </summary>
+        public const string CatalogName = "adbc.get_metadata.target_catalog";
+
+        /// <summary>
+        /// The schema name (or pattern) of the table for GetSchemas, 
GetTables, ... metadata command queries.
+        /// </summary>
+        public const string SchemaName = "adbc.get_metadata.target_db_schema";
+
+        /// <summary>
+        /// The table name (or pattern) of the table for GetSchemas, 
GetTables, ... metadata command queries.
+        /// </summary>
+        public const string TableName = "adbc.get_metadata.target_table";
+
+        /// <summary>
+        /// The comma-separted list of the table types for GetTables metadata 
command query.
+        /// </summary>
+        public const string TableTypes = 
"adbc.get_metadata.target_table_types";
+
+        /// <summary>
+        /// The column name (or pattern) in the table for GetColumns metadata 
command query.
+        /// </summary>
+        public const string ColumnName = "adbc.get_metadata.target_column";
+
+        /// <summary>
+        /// The catalog name (or pattern) of the foreign (child) table for 
GetCrossReference metadata command query.
+        /// </summary>
+        public const string ForeignCatalogName = 
"adbc.get_metadata.foreign_target_catalog";
+
+        /// <summary>
+        /// The schema name (or pattern) of the foreign (child) table for 
GetCrossReference metadata command query.
+        /// </summary>
+        public const string ForeignSchemaName = 
"adbc.get_metadata.foreign_target_db_schema";
+
+        /// <summary>
+        /// The table name (or pattern) of the foreign (child) table for 
GetCrossReference metadata command query.
+        /// </summary>
+        public const string ForeignTableName = 
"adbc.get_metadata.foreign_target_table";
     }
 }
diff --git a/csharp/src/Drivers/Apache/ApacheUtility.cs 
b/csharp/src/Drivers/Apache/ApacheUtility.cs
index f1cb07e07..f9b72af99 100644
--- a/csharp/src/Drivers/Apache/ApacheUtility.cs
+++ b/csharp/src/Drivers/Apache/ApacheUtility.cs
@@ -74,6 +74,18 @@ namespace Apache.Arrow.Adbc.Drivers.Apache
             }
         }
 
+        public static bool BooleanIsValid(string key, string value, out bool 
booleanValue)
+        {
+            if (bool.TryParse(value, out booleanValue))
+            {
+                return true;
+            }
+            else
+            {
+                throw new ArgumentOutOfRangeException(key, nameof(value), 
$"Invalid value for {key}: {value}. Expected a boolean value.");
+            }
+        }
+
         public static bool ContainsException<T>(Exception exception, out T? 
containedException) where T : Exception
         {
             if (exception is AggregateException aggregateException)
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs 
b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
index 990cb4774..215964284 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
@@ -83,7 +83,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             public string TableName { get; internal set; }
             public string ColumnName { get; internal set; }
             public string DataType { get; internal set; }
-            public string TypeName {  get; internal set; }
+            public string TypeName { get; internal set; }
             public string Nullable { get; internal set; }
             public string ColumnDef { get; internal set; }
             public string OrdinalPosition { get; internal set; }
@@ -779,12 +779,14 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
         protected abstract Task<TRowSet> GetRowSetAsync(TGetTablesResp 
response, CancellationToken cancellationToken = default);
         protected abstract Task<TRowSet> GetRowSetAsync(TGetCatalogsResp 
getCatalogsResp, CancellationToken cancellationToken = default);
         protected abstract Task<TRowSet> GetRowSetAsync(TGetSchemasResp 
getSchemasResp, CancellationToken cancellationToken = default);
+        protected internal abstract Task<TRowSet> 
GetRowSetAsync(TGetPrimaryKeysResp response, CancellationToken 
cancellationToken = default);
         protected abstract Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetSchemasResp response, CancellationToken 
cancellationToken = default);
         protected abstract Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetCatalogsResp response, CancellationToken 
cancellationToken = default);
         protected abstract Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetColumnsResp response, CancellationToken 
cancellationToken = default);
         protected abstract Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetTablesResp response, CancellationToken 
cancellationToken = default);
+        protected internal abstract Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetPrimaryKeysResp response, CancellationToken 
cancellationToken = default);
 
-        protected virtual bool AreResultsAvailableDirectly() => false;
+        protected internal virtual bool AreResultsAvailableDirectly() => false;
 
         protected virtual void SetDirectResults(TGetColumnsReq request) => 
throw new System.NotImplementedException();
 
@@ -796,6 +798,10 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
 
         protected virtual void SetDirectResults(TGetTableTypesReq request) => 
throw new System.NotImplementedException();
 
+        protected virtual void SetDirectResults(TGetPrimaryKeysReq request) => 
throw new System.NotImplementedException();
+
+        protected virtual void SetDirectResults(TGetCrossReferenceReq request) 
=> throw new System.NotImplementedException();
+
         protected internal abstract int PositionRequiredOffset { get; }
 
         protected abstract string InfoDriverName { get; }
@@ -804,7 +810,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
 
         protected abstract string ProductVersion { get; }
 
-        protected abstract bool GetObjectsPatternsRequireLowerCase {  get; }
+        protected abstract bool GetObjectsPatternsRequireLowerCase { get; }
 
         protected abstract bool IsColumnSizeValidForDecimal { get; }
 
@@ -909,6 +915,249 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
                 nullBitmapBuffer.Build());
         }
 
+        internal async Task<TGetCatalogsResp> 
GetCatalogsAsync(CancellationToken cancellationToken)
+        {
+            if (SessionHandle == null)
+            {
+                throw new InvalidOperationException("Invalid session");
+            }
+
+            TGetCatalogsReq req = new TGetCatalogsReq(SessionHandle);
+            if (AreResultsAvailableDirectly())
+            {
+                SetDirectResults(req);
+            }
+
+            TGetCatalogsResp resp = await Client.GetCatalogs(req, 
cancellationToken);
+            if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+            {
+                throw new HiveServer2Exception(resp.Status.ErrorMessage)
+                    .SetNativeError(resp.Status.ErrorCode)
+                    .SetSqlState(resp.Status.SqlState);
+            }
+
+            return resp;
+        }
+
+        internal async Task<TGetSchemasResp> GetSchemasAsync(
+            string? catalogName,
+            string? schemaName,
+            CancellationToken cancellationToken)
+        {
+            if (SessionHandle == null)
+            {
+                throw new InvalidOperationException("Invalid session");
+            }
+
+            TGetSchemasReq req = new(SessionHandle);
+            if (AreResultsAvailableDirectly())
+            {
+                SetDirectResults(req);
+            }
+            if (catalogName != null)
+            {
+                req.CatalogName = catalogName;
+            }
+            if (schemaName != null)
+            {
+                req.SchemaName = schemaName;
+            }
+
+            TGetSchemasResp resp = await Client.GetSchemas(req, 
cancellationToken);
+            if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+            {
+                throw new HiveServer2Exception(resp.Status.ErrorMessage)
+                    .SetNativeError(resp.Status.ErrorCode)
+                    .SetSqlState(resp.Status.SqlState);
+            }
+
+            return resp;
+        }
+
+        internal async Task<TGetTablesResp> GetTablesAsync(
+            string? catalogName,
+            string? schemaName,
+            string? tableName,
+            List<string>? tableTypes,
+            CancellationToken cancellationToken)
+        {
+            if (SessionHandle == null)
+            {
+                throw new InvalidOperationException("Invalid session");
+            }
+
+            TGetTablesReq req = new(SessionHandle);
+            if (AreResultsAvailableDirectly())
+            {
+                SetDirectResults(req);
+            }
+            if (catalogName != null)
+            {
+                req.CatalogName = catalogName;
+            }
+            if (schemaName != null)
+            {
+                req.SchemaName = schemaName;
+            }
+            if (tableName != null)
+            {
+                req.TableName = tableName;
+            }
+            if (tableTypes != null && tableTypes.Count > 0)
+            {
+                req.TableTypes = tableTypes;
+            }
+
+            TGetTablesResp resp = await Client.GetTables(req, 
cancellationToken);
+            if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+            {
+                throw new HiveServer2Exception(resp.Status.ErrorMessage)
+                    .SetNativeError(resp.Status.ErrorCode)
+                    .SetSqlState(resp.Status.SqlState);
+            }
+
+            return resp;
+        }
+
+        internal async Task<TGetColumnsResp> GetColumnsAsync(
+            string? catalogName,
+            string? schemaName,
+            string? tableName,
+            string? columnName,
+            CancellationToken cancellationToken)
+        {
+            if (SessionHandle == null)
+            {
+                throw new InvalidOperationException("Invalid session");
+            }
+
+            TGetColumnsReq req = new(SessionHandle);
+            if (AreResultsAvailableDirectly())
+            {
+                SetDirectResults(req);
+            }
+            if (catalogName != null)
+            {
+                req.CatalogName = catalogName;
+            }
+            if (schemaName != null)
+            {
+                req.SchemaName = schemaName;
+            }
+            if (tableName != null)
+            {
+                req.TableName = tableName;
+            }
+            if (columnName != null)
+            {
+                req.ColumnName = columnName;
+            }
+
+            TGetColumnsResp resp = await Client.GetColumns(req, 
cancellationToken);
+            if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+            {
+                throw new HiveServer2Exception(resp.Status.ErrorMessage)
+                    .SetNativeError(resp.Status.ErrorCode)
+                    .SetSqlState(resp.Status.SqlState);
+            }
+
+            return resp;
+        }
+
+        internal async Task<TGetPrimaryKeysResp> GetPrimaryKeysAsync(
+            string? catalogName,
+            string? schemaName,
+            string? tableName,
+            CancellationToken cancellationToken = default)
+        {
+            if (SessionHandle == null)
+            {
+                throw new InvalidOperationException("Invalid session");
+            }
+
+            TGetPrimaryKeysReq req = new(SessionHandle);
+            if (AreResultsAvailableDirectly())
+            {
+                SetDirectResults(req);
+            }
+            if (catalogName != null)
+            {
+                req.CatalogName = catalogName!;
+            }
+            if (schemaName != null)
+            {
+                req.SchemaName = schemaName!;
+            }
+            if (tableName != null)
+            {
+                req.TableName = tableName!;
+            }
+
+            TGetPrimaryKeysResp resp = await Client.GetPrimaryKeys(req, 
cancellationToken);
+            if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+            {
+                throw new HiveServer2Exception(resp.Status.ErrorMessage)
+                    .SetNativeError(resp.Status.ErrorCode)
+                    .SetSqlState(resp.Status.SqlState);
+            }
+
+            return resp;
+        }
+
+        internal async Task<TGetCrossReferenceResp> GetCrossReferenceAsync(
+            string? catalogName,
+            string? schemaName,
+            string? tableName,
+            string? foreignCatalogName,
+            string? foreignSchemaName,
+            string? foreignTableName,
+            CancellationToken cancellationToken = default)
+        {
+            if (SessionHandle == null)
+            {
+                throw new InvalidOperationException("Invalid session");
+            }
+
+            TGetCrossReferenceReq req = new(SessionHandle);
+            if (AreResultsAvailableDirectly())
+            {
+                SetDirectResults(req);
+            }
+            if (catalogName != null)
+            {
+                req.ParentCatalogName = catalogName!;
+            }
+            if (schemaName != null)
+            {
+                req.ParentSchemaName = schemaName!;
+            }
+            if (tableName != null)
+            {
+                req.ParentTableName = tableName!;
+            }
+            if (foreignCatalogName != null)
+            {
+                req.ForeignCatalogName = foreignCatalogName!;
+            }
+            if (schemaName != null)
+            {
+                req.ForeignSchemaName = foreignSchemaName!;
+            }
+            if (tableName != null)
+            {
+                req.ForeignTableName = foreignTableName!;
+            }
+
+            TGetCrossReferenceResp resp = await Client.GetCrossReference(req, 
cancellationToken);
+            if (resp.Status.StatusCode != TStatusCode.SUCCESS_STATUS)
+            {
+                throw new HiveServer2Exception(resp.Status.ErrorMessage)
+                    .SetNativeError(resp.Status.ErrorCode)
+                    .SetSqlState(resp.Status.SqlState);
+            }
+            return resp;
+        }
+
         private static StructArray GetColumnSchema(TableInfo tableInfo)
         {
             StringArray.Builder columnNameBuilder = new StringArray.Builder();
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs 
b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
index 6ebcb9267..1c124d325 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs
@@ -301,6 +301,8 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             GetResultSetMetadataAsync(response.OperationHandle, Client, 
cancellationToken);
         protected override Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetTablesResp response, CancellationToken 
cancellationToken = default) =>
             GetResultSetMetadataAsync(response.OperationHandle, Client, 
cancellationToken);
+        protected internal override Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetPrimaryKeysResp response, CancellationToken 
cancellationToken = default) =>
+            GetResultSetMetadataAsync(response.OperationHandle, Client, 
cancellationToken);
         protected override Task<TRowSet> GetRowSetAsync(TGetTableTypesResp 
response, CancellationToken cancellationToken = default) =>
             FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
         protected override Task<TRowSet> GetRowSetAsync(TGetColumnsResp 
response, CancellationToken cancellationToken = default) =>
@@ -311,6 +313,8 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
         protected override Task<TRowSet> GetRowSetAsync(TGetSchemasResp 
response, CancellationToken cancellationToken = default) =>
             FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
+        protected internal override Task<TRowSet> 
GetRowSetAsync(TGetPrimaryKeysResp response, CancellationToken 
cancellationToken = default) =>
+            FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
 
         protected internal override int PositionRequiredOffset => 0;
 
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs 
b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
index c571947b2..c746658bc 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
@@ -99,8 +99,8 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
                 // Await the fetch response
                 TFetchResultsResp response = await FetchNext(_statement, 
cancellationToken);
 
-                int columnCount = GetColumnCount(response);
-                int rowCount = GetRowCount(response, columnCount);
+                int columnCount = GetColumnCount(response.Results);
+                int rowCount = GetRowCount(response.Results, columnCount);
                 if ((_enableBatchSizeStopCondition && _statement.BatchSize > 0 
&& rowCount < _statement.BatchSize) || rowCount == 0)
                 {
                     // This is the last batch
@@ -124,27 +124,34 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
 
         private RecordBatch CreateBatch(TFetchResultsResp response, int 
columnCount, int rowCount)
         {
-            IList<IArrowArray> columnData = [];
-            bool shouldConvertScalar = 
_dataTypeConversion.HasFlag(DataTypeConversion.Scalar);
+            IReadOnlyList<IArrowArray> columnData = 
GetArrowArrayData(response.Results, columnCount, Schema, _dataTypeConversion);
+
+            return new RecordBatch(Schema, columnData, rowCount);
+        }
+
+        internal static IReadOnlyList<IArrowArray> GetArrowArrayData(TRowSet 
response, int columnCount, Schema schema, DataTypeConversion dataTypeConversion)
+        {
+            List<IArrowArray> columnData = [];
+            bool shouldConvertScalar = 
dataTypeConversion.HasFlag(DataTypeConversion.Scalar);
             for (int i = 0; i < columnCount; i++)
             {
-                IArrowType? expectedType = shouldConvertScalar ? 
Schema.FieldsList[i].DataType : null;
-                IArrowArray columnArray = 
GetArray(response.Results.Columns[i], expectedType);
+                IArrowType? expectedType = shouldConvertScalar ? 
schema.FieldsList[i].DataType : null;
+                IArrowArray columnArray = GetArray(response.Columns[i], 
expectedType);
                 columnData.Add(columnArray);
             }
 
-            return new RecordBatch(Schema, columnData, rowCount);
+            return columnData;
         }
 
-        private static int GetColumnCount(TFetchResultsResp response) =>
-            response.Results.Columns.Count;
+        internal static int GetColumnCount(TRowSet response) =>
+            response.Columns.Count;
 
-        private static int GetRowCount(TFetchResultsResp response, int 
columnCount) =>
-            columnCount > 0 ? GetArray(response.Results.Columns[0]).Length : 0;
+        internal static int GetRowCount(TRowSet response, int columnCount) =>
+            columnCount > 0 ? GetArray(response.Columns[0]).Length : 0;
 
         private static async Task<TFetchResultsResp> 
FetchNext(HiveServer2Statement statement, CancellationToken cancellationToken = 
default)
         {
-            var request = new TFetchResultsReq(statement.OperationHandle, 
TFetchOrientation.FETCH_NEXT, statement.BatchSize);
+            var request = new TFetchResultsReq(statement.OperationHandle!, 
TFetchOrientation.FETCH_NEXT, statement.BatchSize);
             return await statement.Connection.Client.FetchResults(request, 
cancellationToken);
         }
 
diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs 
b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
index 9042b4205..f43bad07a 100644
--- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
+++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 using Apache.Arrow.Ipc;
@@ -27,6 +28,20 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
 {
     internal class HiveServer2Statement : AdbcStatement
     {
+        private const string GetPrimaryKeysCommandName = "getprimarykeys";
+        private const string GetCrossReferenceCommandName = 
"getcrossreference";
+        private const string GetCatalogsCommandName = "getcatalogs";
+        private const string GetSchemasCommandName = "getschemas";
+        private const string GetTablesCommandName = "gettables";
+        private const string GetColumnsCommandName = "getcolumns";
+        private const string SupportedMetadataCommands =
+            GetCatalogsCommandName + "," +
+            GetSchemasCommandName + "," +
+            GetTablesCommandName + "," +
+            GetColumnsCommandName + "," +
+            GetPrimaryKeysCommandName + "," +
+            GetCrossReferenceCommandName;
+
         internal HiveServer2Statement(HiveServer2Connection connection)
         {
             Connection = connection;
@@ -78,6 +93,11 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
 
         private async Task<QueryResult> 
ExecuteQueryAsyncInternal(CancellationToken cancellationToken = default)
         {
+            if (IsMetadataCommand)
+            {
+                return await ExecuteMetadataCommandQuery(cancellationToken);
+            }
+
             // this could either:
             // take QueryTimeoutSeconds * 3
             // OR
@@ -184,6 +204,36 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
                         QueryTimeoutSeconds = queryTimeoutSeconds;
                     }
                     break;
+                case ApacheParameters.IsMetadataCommand:
+                    if (ApacheUtility.BooleanIsValid(key, value, out bool 
isMetadataCommand))
+                    {
+                        IsMetadataCommand = isMetadataCommand;
+                    }
+                    break;
+                case ApacheParameters.CatalogName:
+                    this.CatalogName = value;
+                    break;
+                case ApacheParameters.SchemaName:
+                    this.SchemaName = value;
+                    break;
+                case ApacheParameters.TableName:
+                    this.TableName = value;
+                    break;
+                case ApacheParameters.TableTypes:
+                    this.TableTypes = value;
+                    break;
+                case ApacheParameters.ColumnName:
+                    this.ColumnName = value;
+                    break;
+                case ApacheParameters.ForeignCatalogName:
+                    this.ForeignCatalogName = value;
+                    break;
+                case ApacheParameters.ForeignSchemaName:
+                    this.ForeignSchemaName = value;
+                    break;
+                case ApacheParameters.ForeignTableName:
+                    this.ForeignTableName = value;
+                    break;
                 default:
                     throw AdbcException.NotImplemented($"Option '{key}' is not 
implemented.");
             }
@@ -191,7 +241,12 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
 
         protected async Task ExecuteStatementAsync(CancellationToken 
cancellationToken = default)
         {
-            TExecuteStatementReq executeRequest = new 
TExecuteStatementReq(Connection.SessionHandle!, SqlQuery!);
+            if (Connection.SessionHandle == null)
+            {
+                throw new InvalidOperationException("Invalid session");
+            }
+
+            TExecuteStatementReq executeRequest = new 
TExecuteStatementReq(Connection.SessionHandle, SqlQuery!);
             SetStatementProperties(executeRequest);
             TExecuteStatementResp executeResponse = await 
Connection.Client.ExecuteStatement(executeRequest, cancellationToken);
             if (executeResponse.Status.StatusCode == TStatusCode.ERROR_STATUS)
@@ -214,6 +269,16 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
             set => Connection.QueryTimeoutSeconds = value;
         }
 
+        protected internal bool IsMetadataCommand { get; set; } = false;
+        protected internal string? CatalogName { get; set; }
+        protected internal string? SchemaName { get; set; }
+        protected internal string? TableName { get; set; }
+        protected internal string? TableTypes { get; set; }
+        protected internal string? ColumnName { get; set; }
+        protected internal string? ForeignCatalogName { get; set; }
+        protected internal string? ForeignSchemaName { get; set; }
+        protected internal string? ForeignTableName { get; set; }
+
         public HiveServer2Connection Connection { get; private set; }
 
         public TOperationHandle? OperationHandle { get; private set; }
@@ -255,5 +320,118 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
                 }
             }
         }
+
+        private async Task<QueryResult> 
ExecuteMetadataCommandQuery(CancellationToken cancellationToken)
+        {
+            return SqlQuery?.ToLowerInvariant() switch
+            {
+                GetCatalogsCommandName => await 
GetCatalogsAsync(cancellationToken),
+                GetSchemasCommandName => await 
GetSchemasAsync(cancellationToken),
+                GetTablesCommandName => await 
GetTablesAsync(cancellationToken),
+                GetColumnsCommandName => await 
GetColumnsAsync(cancellationToken),
+                GetPrimaryKeysCommandName => await 
GetPrimaryKeysAsync(cancellationToken),
+                GetCrossReferenceCommandName => await 
GetCrossReferenceAsync(cancellationToken),
+                null or "" => throw new 
ArgumentNullException(nameof(SqlQuery), $"Metadata command for property 
'SqlQuery' must not be empty or null. Supported metadata commands: 
{SupportedMetadataCommands}"),
+                _ => throw new NotSupportedException($"Metadata command 
'{SqlQuery}' is not supported. Supported metadata commands: 
{SupportedMetadataCommands}"),
+            };
+        }
+
+        private async Task<QueryResult> 
GetCrossReferenceAsync(CancellationToken cancellationToken = default)
+        {
+            TGetCrossReferenceResp resp = await 
Connection.GetCrossReferenceAsync(
+                CatalogName,
+                SchemaName,
+                TableName,
+                ForeignCatalogName,
+                ForeignSchemaName,
+                ForeignTableName,
+                cancellationToken);
+            OperationHandle = resp.OperationHandle;
+
+            return await GetQueryResult(resp.DirectResults, cancellationToken);
+        }
+
+        private async Task<QueryResult> GetPrimaryKeysAsync(CancellationToken 
cancellationToken = default)
+        {
+            TGetPrimaryKeysResp resp = await Connection.GetPrimaryKeysAsync(
+                CatalogName,
+                SchemaName,
+                TableName,
+                cancellationToken);
+            OperationHandle = resp.OperationHandle;
+
+            return await GetQueryResult(resp.DirectResults, cancellationToken);
+        }
+
+        private async Task<QueryResult> GetCatalogsAsync(CancellationToken 
cancellationToken = default)
+        {
+            TGetCatalogsResp resp = await 
Connection.GetCatalogsAsync(cancellationToken);
+            OperationHandle = resp.OperationHandle;
+
+            return await GetQueryResult(resp.DirectResults, cancellationToken);
+        }
+
+        private async Task<QueryResult> GetSchemasAsync(CancellationToken 
cancellationToken = default)
+        {
+            TGetSchemasResp resp = await Connection.GetSchemasAsync(
+                CatalogName,
+                SchemaName,
+                cancellationToken);
+            OperationHandle = resp.OperationHandle;
+
+            return await GetQueryResult(resp.DirectResults, cancellationToken);
+        }
+
+        private async Task<QueryResult> GetTablesAsync(CancellationToken 
cancellationToken = default)
+        {
+            List<string>? tableTypesList = 
this.TableTypes?.Split(',').ToList();
+            TGetTablesResp resp = await Connection.GetTablesAsync(
+                CatalogName,
+                SchemaName,
+                TableName,
+                tableTypesList,
+                cancellationToken);
+            OperationHandle = resp.OperationHandle;
+
+            return await GetQueryResult(resp.DirectResults, cancellationToken);
+        }
+
+        private async Task<QueryResult> GetColumnsAsync(CancellationToken 
cancellationToken = default)
+        {
+            TGetColumnsResp resp = await Connection.GetColumnsAsync(
+                CatalogName,
+                SchemaName,
+                TableName,
+                ColumnName,
+                cancellationToken);
+            OperationHandle = resp.OperationHandle;
+
+            return await GetQueryResult(resp.DirectResults, cancellationToken);
+        }
+
+        private async Task<Schema> GetResultSetSchemaAsync(TOperationHandle 
operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken 
= default)
+        {
+            TGetResultSetMetadataResp response = await 
HiveServer2Connection.GetResultSetMetadataAsync(operationHandle, client, 
cancellationToken);
+            return Connection.SchemaParser.GetArrowSchema(response.Schema, 
Connection.DataTypeConversion);
+        }
+
+        private async Task<QueryResult> GetQueryResult(TSparkDirectResults? 
directResults, CancellationToken cancellationToken)
+        {
+            Schema schema;
+            if (Connection.AreResultsAvailableDirectly() && 
directResults?.ResultSet?.Results != null)
+            {
+                TGetResultSetMetadataResp resultSetMetadata = 
directResults.ResultSetMetadata;
+                schema = 
Connection.SchemaParser.GetArrowSchema(resultSetMetadata.Schema, 
Connection.DataTypeConversion);
+                TRowSet rowSet = directResults.ResultSet.Results;
+                int columnCount = HiveServer2Reader.GetColumnCount(rowSet);
+                int rowCount = HiveServer2Reader.GetRowCount(rowSet, 
columnCount);
+                IReadOnlyList<IArrowArray> data = 
HiveServer2Reader.GetArrowArrayData(rowSet, columnCount, schema, 
Connection.DataTypeConversion);
+                return new QueryResult(rowCount, new 
HiveServer2Connection.HiveInfoArrowStream(schema, data));
+            }
+
+            await HiveServer2Connection.PollForResponseAsync(OperationHandle!, 
Connection.Client, PollTimeMilliseconds, cancellationToken);
+            schema = await GetResultSetSchemaAsync(OperationHandle!, 
Connection.Client, cancellationToken);
+            return new QueryResult(-1, Connection.NewReader(this, schema));
+        }
     }
 }
diff --git a/csharp/src/Drivers/Apache/Hive2/README.md 
b/csharp/src/Drivers/Apache/Hive2/README.md
index b78b81277..76040d217 100644
--- a/csharp/src/Drivers/Apache/Hive2/README.md
+++ b/csharp/src/Drivers/Apache/Hive2/README.md
@@ -39,6 +39,15 @@ but can also be passed in the call to `AdbcDatabase.Connect`.
 | `adbc.apache.statement.batch_size` | Sets the maximum number of rows to 
retrieve in a single batch request. | `50000` |
 | `adbc.apache.statement.polltime_ms` | If polling is necessary to get a 
result, this option sets the length of time (in milliseconds) to wait between 
polls. | `500` |
 | `adbc.apache.statement.query_timeout_s` | Sets the maximum time (in seconds) 
for a query to complete. Values can be 0 (infinite) or greater than zero. | 
`60` |
+| `adbc.apache.statement.is_metadata_command` | Indicate that the value of 
`AdbcStatement.SqlQuery` contains the name of a native metadata command. If set 
to `True`, it indicates a metadata command query whereas a value of `False` 
indicates a SQL command query. <br><br>Supported metadata commands include: 
`GetPrimaryKeys`, `GetCrossReference`, `GetCatalogs`, `GetSchemas`, 
`GetTables`, and `GetColumns`. | `False` |
+| `adbc.get_metadata.target_catalog` | The catalog name (or pattern) when used 
with a metadata command query. <br><br>Supported metadata commands include: 
`GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`, `GetTables`, and 
`GetColumns`. | |
+| `adbc.get_metadata.target_db_schema` | The schema name (or pattern) when 
used with a metadata command query. <br><br>Supported metadata commands 
include: `GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`, `GetTables`, and 
`GetColumns`. | |
+| `adbc.get_metadata.target_table` | The table name (or pattern) when used 
with a metadata command query. <br><br>Supported metadata commands include: 
`GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`, `GetTables`, and 
`GetColumns`. | |
+| `adbc.get_metadata.target_table_types` | The comma-separated list of table 
types when used with a metadata command query. <br><br>Supported metadata 
commands include: `GetTables`. | |
+| `adbc.get_metadata.target_column` | The column name (or pattern) when used 
with a metadata command query. <br><br>Supported metadata commands include: 
`GetColumns`.  | |
+| `adbc.get_metadata.foreign_target_catalog` | The foreign (i.e., child) 
catalog name (or pattern) when used with a metadata command query. 
<br><br>Supported metadata commands include: `GetCrossReference`. | |
+| `adbc.get_metadata.foreign_target_db_schema` | The foreign (i.e., child) 
schema name (or pattern) when used with a metadata command query. 
<br><br>Supported metadata commands include: `GetCrossReference`. | |
+| `adbc.get_metadata.foreign_target_table` | The foreign (i.e., child) table 
name (or pattern) when used with a metadata command query. <br><br>Supported 
metadata commands include: `GetCrossReference`. | |
 | `adbc.http_options.tls.enabled` | If tls needs to enabled or not. One of 
`True`, `False` | `True` |
 | `adbc.http_options.tls.disable_server_certificate_validation` | If tls/ssl 
server certificate validation needs to enabled or not. One of `True`, `False`. 
If set to True, all certificate validation errors are ignored | `False` |
 | `adbc.http_options.tls.allow_self_signed` | If self signed tls/ssl 
certificate needs to be allowed or not. One of `True`, `False` | `False` |
diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs 
b/csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs
index 238302435..501fcfe66 100644
--- a/csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs
+++ b/csharp/src/Drivers/Apache/Impala/ImpalaConnection.cs
@@ -73,6 +73,8 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
             GetResultSetMetadataAsync(response.OperationHandle, Client, 
cancellationToken);
         protected override Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetTablesResp response, CancellationToken 
cancellationToken = default) =>
             GetResultSetMetadataAsync(response.OperationHandle, Client, 
cancellationToken);
+        protected internal override Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetPrimaryKeysResp response, CancellationToken 
cancellationToken = default) =>
+            GetResultSetMetadataAsync(response.OperationHandle, Client, 
cancellationToken);
         protected override Task<TRowSet> GetRowSetAsync(TGetTableTypesResp 
response, CancellationToken cancellationToken = default) =>
             FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
         protected override Task<TRowSet> GetRowSetAsync(TGetColumnsResp 
response, CancellationToken cancellationToken = default) =>
@@ -83,6 +85,8 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
             FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
         protected override Task<TRowSet> GetRowSetAsync(TGetSchemasResp 
response, CancellationToken cancellationToken = default) =>
             FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
+        protected internal override Task<TRowSet> 
GetRowSetAsync(TGetPrimaryKeysResp response, CancellationToken 
cancellationToken = default) =>
+            FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
 
         protected override void SetPrecisionScaleAndTypeName(
             short colType,
diff --git a/csharp/src/Drivers/Apache/Hive2/README.md 
b/csharp/src/Drivers/Apache/Impala/README.md
similarity index 68%
copy from csharp/src/Drivers/Apache/Hive2/README.md
copy to csharp/src/Drivers/Apache/Impala/README.md
index b78b81277..0b287f7fe 100644
--- a/csharp/src/Drivers/Apache/Hive2/README.md
+++ b/csharp/src/Drivers/Apache/Impala/README.md
@@ -17,16 +17,16 @@
 
 -->
 
-# Hive Driver
+# Impala Driver
 
 ## Database and Connection Properties
 
-Properties should be passed in the call to `HiveServer2Driver.Open`,
-but can also be passed in the call to `AdbcDatabase.Connect`.
+Properties should be passed in the call to `ImpalaDriver.Open`,
+but can also be passed in the call to `AdbcDatabase.Connect`. Options 
beginning with `adbc.apache.*` can also be set using the 
`AdbcStatement.SetOption` method.
 
 | Property               | Description | Default |
 | :---                   | :---        | :---    |
-| `adbc.hive.transport_type`      | (Required) Indicates the Hive transport 
type. `http` | |
+| `adbc.hive.transport_type` | (Required) Indicates the Hive transport type. 
`http` or `standard` | |
 | `adbc.hive.auth_type` | An indicator of the intended type of authentication. 
Allowed values: `none`, `username_only` and `basic`. This property is optional. 
The authentication type can be inferred from `username`, and `password`. | |
 | `adbc.hive.host`      | Host name for the data source. Do not include scheme 
or port number. Example: `hiveserver.region.cloudapp.azure.com` |  |
 | `adbc.hive.port`      | The port number the data source listens on for a new 
connections. | `443` |
@@ -39,7 +39,16 @@ but can also be passed in the call to `AdbcDatabase.Connect`.
 | `adbc.apache.statement.batch_size` | Sets the maximum number of rows to 
retrieve in a single batch request. | `50000` |
 | `adbc.apache.statement.polltime_ms` | If polling is necessary to get a 
result, this option sets the length of time (in milliseconds) to wait between 
polls. | `500` |
 | `adbc.apache.statement.query_timeout_s` | Sets the maximum time (in seconds) 
for a query to complete. Values can be 0 (infinite) or greater than zero. | 
`60` |
-| `adbc.http_options.tls.enabled` | If tls needs to enabled or not. One of 
`True`, `False` | `True` |
+| `adbc.apache.statement.is_metadata_command` | Indicate that the value of 
`AdbcStatement.SqlQuery` contains the name of a native metadata command. If set 
to `True`, it indicates a metadata command query whereas a value of `False` 
indicates a SQL command query. <br><br>Supported metadata commands include: 
`GetPrimaryKeys`, `GetCrossReference`, `GetCatalogs`, `GetSchemas`, 
`GetTables`, and `GetColumns`. | `False` |
+| `adbc.get_metadata.target_catalog` | The catalog name (or pattern) when used 
with a metadata command query. <br><br>Supported metadata commands include: 
`GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`, `GetTables`, and 
`GetColumns`. | |
+| `adbc.get_metadata.target_db_schema` | The schema name (or pattern) when 
used with a metadata command query. <br><br>Supported metadata commands 
include: `GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`, `GetTables`, and 
`GetColumns`. | |
+| `adbc.get_metadata.target_table` | The table name (or pattern) when used 
with a metadata command query. <br><br>Supported metadata commands include: 
`GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`, `GetTables`, and 
`GetColumns`. | |
+| `adbc.get_metadata.target_table_types` | The comma-separated list of table 
types when used with a metadata command query. <br><br>Supported metadata 
commands include: `GetTables`. | |
+| `adbc.get_metadata.target_column` | The column name (or pattern) when used 
with a metadata command query. <br><br>Supported metadata commands include: 
`GetColumns`.  | |
+| `adbc.get_metadata.foreign_target_catalog` | The foreign (i.e., child) 
catalog name (or pattern) when used with a metadata command query. 
<br><br>Supported metadata commands include: `GetCrossReference`. | |
+| `adbc.get_metadata.foreign_target_db_schema` | The foreign (i.e., child) 
schema name (or pattern) when used with a metadata command query. 
<br><br>Supported metadata commands include: `GetCrossReference`. | |
+| `adbc.get_metadata.foreign_target_table` | The foreign (i.e., child) table 
name (or pattern) when used with a metadata command query. <br><br>Supported 
metadata commands include: `GetCrossReference`. | |
+| `adbc.http_options.tls.enabled` | If tls needs to enabled or not. One of 
`True`, `False` | `False` |
 | `adbc.http_options.tls.disable_server_certificate_validation` | If tls/ssl 
server certificate validation needs to enabled or not. One of `True`, `False`. 
If set to True, all certificate validation errors are ignored | `False` |
 | `adbc.http_options.tls.allow_self_signed` | If self signed tls/ssl 
certificate needs to be allowed or not. One of `True`, `False` | `False` |
 | `adbc.http_options.tls.allow_hostname_mismatch` | If hostname mismatch is 
allowed for ssl. One of `True`, `False` | `False` |
@@ -53,7 +62,7 @@ The `adbc.apache.statement.query_timeout_s` is analogous to a 
CommandTimeout for
 
 The `adbc.apache.statement.polltime_ms` specifies the time between polls to 
the service, up to the limit specifed by 
`adbc.apache.statement.query_timeout_s`.
 
-## Hive Data Types
+## Impala Data Types
 
 The following table depicts how the Hive ADBC driver converts a Hive type to 
an Arrow type and a .NET type:
 
@@ -71,8 +80,6 @@ The following table depicts how the Hive ADBC driver converts 
a Hive type to an
 | DOUBLE               | Double     | double | | |
 | FLOAT                | *Double*   | *double* | Float | float |
 | INT                  | Int32      | int | | |
-| INTERVAL_DAY_TIME+   | String     | string | | |
-| INTERVAL_YEAR_MONTH+ | String     | string | | |
 | MAP*                 | String     | string | | |
 | NULL                 | String     | string | | |
 | SMALLINT             | Int16      | short | | |
@@ -87,16 +94,10 @@ The following table depicts how the Hive ADBC driver 
converts a Hive type to an
 
 ## Supported Variants
 
-### Apache Hive over HTTP
+### Impala over HTTP
 
-Support for Hive over HTTP is the most mature.
+Support for Impala HTTP is the most mature.
 
-### Azure Hive HDInsight
+### Impala over TCP Socket (standar)
 
-To read data from Azure HDInsight Hive Cluster, use the following parameters:
-adbc.hive.type = "http"
-adbc.hive.port = "443"
-adbc.hive.path = "/hive2"
-adbc.hive.host = $"{clusterHostName}"
-username = $"{clusterUserName}"
-password = $"{clusterPassword}"
+Support for Impala over TCP socket is also supported.
diff --git a/csharp/src/Drivers/Apache/Spark/README.md 
b/csharp/src/Drivers/Apache/Spark/README.md
index 5486c60e1..94712ee53 100644
--- a/csharp/src/Drivers/Apache/Spark/README.md
+++ b/csharp/src/Drivers/Apache/Spark/README.md
@@ -40,6 +40,15 @@ but can also be passed in the call to `AdbcDatabase.Connect`.
 | `adbc.apache.statement.batch_size` | Sets the maximum number of rows to 
retrieve in a single batch request. | `50000` |
 | `adbc.apache.statement.polltime_ms` | If polling is necessary to get a 
result, this option sets the length of time (in milliseconds) to wait between 
polls. | `500` |
 | `adbc.apache.statement.query_timeout_s` | Sets the maximum time (in seconds) 
for a query to complete. Values can be 0 (infinite) or greater than zero. | 
`60` |
+| `adbc.apache.statement.is_metadata_command` | Indicate that the value of 
`AdbcStatement.SqlQuery` contains the name of a native metadata command. If set 
to `True`, it indicates a metadata command query whereas a value of `False` 
indicates a SQL command query. <br><br>Supported metadata commands include: 
`GetPrimaryKeys`, `GetCrossReference`, `GetCatalogs`, `GetSchemas`, 
`GetTables`, and `GetColumns`. | `False` |
+| `adbc.get_metadata.target_catalog` | The catalog name (or pattern) when used 
with a metadata command query. <br><br>Supported metadata commands include: 
`GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`, `GetTables`, and 
`GetColumns`. | |
+| `adbc.get_metadata.target_db_schema` | The schema name (or pattern) when 
used with a metadata command query. <br><br>Supported metadata commands 
include: `GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`, `GetTables`, and 
`GetColumns`. | |
+| `adbc.get_metadata.target_table` | The table name (or pattern) when used 
with a metadata command query. <br><br>Supported metadata commands include: 
`GetPrimaryKeys`, `GetCrossReference`, `GetSchemas`, `GetTables`, and 
`GetColumns`. | |
+| `adbc.get_metadata.target_table_types` | The comma-separated list of table 
types when used with a metadata command query. <br><br>Supported metadata 
commands include: `GetTables`. | |
+| `adbc.get_metadata.target_column` | The column name (or pattern) when used 
with a metadata command query. <br><br>Supported metadata commands include: 
`GetColumns`.  | |
+| `adbc.get_metadata.foreign_target_catalog` | The foreign (i.e., child) 
catalog name (or pattern) when used with a metadata command query. 
<br><br>Supported metadata commands include: `GetCrossReference`. | |
+| `adbc.get_metadata.foreign_target_db_schema` | The foreign (i.e., child) 
schema name (or pattern) when used with a metadata command query. 
<br><br>Supported metadata commands include: `GetCrossReference`. | |
+| `adbc.get_metadata.foreign_target_table` | The foreign (i.e., child) table 
name (or pattern) when used with a metadata command query. <br><br>Supported 
metadata commands include: `GetCrossReference`. | |
 | `adbc.http_options.tls.enabled` | If tls needs to enabled or not. One of 
`True`, `False` | `True` |
 | `adbc.http_options.tls.disable_server_certificate_validation` | If tls/ssl 
server certificate validation needs to enabled or not. One of `True`, `False`. 
If set to True, all certificate validation errors are ignored | `False` |
 | `adbc.http_options.tls.allow_self_signed` | If self signed tls/ssl 
certificate needs to be allowed or not. One of `True`, `False` | `False` |
diff --git a/csharp/src/Drivers/Apache/Spark/SparkConnection.cs 
b/csharp/src/Drivers/Apache/Spark/SparkConnection.cs
index 2eb11e941..d9cbbb0d9 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkConnection.cs
@@ -123,7 +123,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
 
         protected override bool IsColumnSizeValidForDecimal => false;
 
-        protected override bool AreResultsAvailableDirectly() => true;
+        protected internal override bool AreResultsAvailableDirectly() => true;
 
         protected override void SetDirectResults(TGetColumnsReq request) => 
request.GetDirectResults = sparkGetDirectResults;
 
@@ -135,6 +135,10 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
 
         protected override void SetDirectResults(TGetTableTypesReq request) => 
request.GetDirectResults = sparkGetDirectResults;
 
+        protected override void SetDirectResults(TGetPrimaryKeysReq request) 
=> request.GetDirectResults = sparkGetDirectResults;
+
+        protected override void SetDirectResults(TGetCrossReferenceReq 
request) => request.GetDirectResults = sparkGetDirectResults;
+
         protected abstract void ValidateConnection();
         protected abstract void ValidateAuthentication();
         protected abstract void ValidateOptions();
diff --git a/csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs 
b/csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs
index a2413635b..75705189b 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs
@@ -18,7 +18,6 @@
 using System.Collections.Generic;
 using System.Threading;
 using System.Threading.Tasks;
-using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
 using Apache.Arrow.Adbc.Drivers.Apache.Spark.CloudFetch;
 using Apache.Arrow.Ipc;
 using Apache.Hive.Service.Rpc.Thrift;
@@ -84,6 +83,8 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
             Task.FromResult(response.DirectResults.ResultSetMetadata);
         protected override Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetTablesResp response, CancellationToken 
cancellationToken = default) =>
             Task.FromResult(response.DirectResults.ResultSetMetadata);
+        protected internal override Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetPrimaryKeysResp response, CancellationToken 
cancellationToken = default) =>
+            Task.FromResult(response.DirectResults.ResultSetMetadata);
 
         protected override Task<TRowSet> GetRowSetAsync(TGetTableTypesResp 
response, CancellationToken cancellationToken = default) =>
             Task.FromResult(response.DirectResults.ResultSet.Results);
@@ -95,5 +96,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
             Task.FromResult(response.DirectResults.ResultSet.Results);
         protected override Task<TRowSet> GetRowSetAsync(TGetSchemasResp 
response, CancellationToken cancellationToken = default) =>
             Task.FromResult(response.DirectResults.ResultSet.Results);
+        protected internal override Task<TRowSet> 
GetRowSetAsync(TGetPrimaryKeysResp response, CancellationToken 
cancellationToken = default) =>
+            Task.FromResult(response.DirectResults.ResultSet.Results);
     }
 }
diff --git a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs 
b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
index fd7f18097..a1fafc367 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
@@ -233,6 +233,8 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
             GetResultSetMetadataAsync(response.OperationHandle, Client, 
cancellationToken);
         protected override Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetTablesResp response, CancellationToken 
cancellationToken = default) =>
             GetResultSetMetadataAsync(response.OperationHandle, Client, 
cancellationToken);
+        protected internal override Task<TGetResultSetMetadataResp> 
GetResultSetMetadataAsync(TGetPrimaryKeysResp response, CancellationToken 
cancellationToken = default) =>
+            GetResultSetMetadataAsync(response.OperationHandle, Client, 
cancellationToken);
         protected override Task<TRowSet> GetRowSetAsync(TGetTableTypesResp 
response, CancellationToken cancellationToken = default) =>
             FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
         protected override Task<TRowSet> GetRowSetAsync(TGetColumnsResp 
response, CancellationToken cancellationToken = default) =>
@@ -243,6 +245,8 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
             FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
         protected override Task<TRowSet> GetRowSetAsync(TGetSchemasResp 
response, CancellationToken cancellationToken = default) =>
             FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
+        protected internal override Task<TRowSet> 
GetRowSetAsync(TGetPrimaryKeysResp response, CancellationToken 
cancellationToken = default) =>
+            FetchResultsAsync(response.OperationHandle, cancellationToken: 
cancellationToken);
 
         internal override SchemaParser SchemaParser => new 
HiveServer2SchemaParser();
 
diff --git a/csharp/test/Apache.Arrow.Adbc.Tests/TestBase.cs 
b/csharp/test/Apache.Arrow.Adbc.Tests/TestBase.cs
index b7f9b76bb..3782ad09e 100644
--- a/csharp/test/Apache.Arrow.Adbc.Tests/TestBase.cs
+++ b/csharp/test/Apache.Arrow.Adbc.Tests/TestBase.cs
@@ -96,7 +96,7 @@ namespace Apache.Arrow.Adbc.Tests
                         "{0}{1}{2}",
                         
string.IsNullOrEmpty(TestConfiguration.Metadata.Catalog) ? string.Empty : 
DelimitIdentifier(TestConfiguration.Metadata.Catalog) + ".",
                         
string.IsNullOrEmpty(TestConfiguration.Metadata.Schema) ? string.Empty : 
DelimitIdentifier(TestConfiguration.Metadata.Schema) + ".",
-                        
DelimitIdentifier(Guid.NewGuid().ToString().Replace("-", ""))
+                        DelimitIdentifier(Guid.NewGuid().ToString("N"))
                     );
 
         /// <summary>
@@ -658,6 +658,15 @@ namespace Apache.Arrow.Adbc.Tests
             return name.Substring(1);
         }
 
+        protected void CreateNewTableName(out string tableName, out string 
fullTableName)
+        {
+            string catalogName = TestConfiguration.Metadata.Catalog;
+            string schemaName = TestConfiguration.Metadata.Schema;
+            tableName = Guid.NewGuid().ToString("N");
+            string catalogFormatted = string.IsNullOrEmpty(catalogName) ? 
string.Empty : DelimitIdentifier(catalogName) + ".";
+            fullTableName = 
$"{catalogFormatted}{DelimitIdentifier(schemaName)}.{DelimitIdentifier(tableName)}";
+        }
+
         /// <summary>
         /// Represents a temporary table that can create and drop the table 
automatically.
         /// </summary>
@@ -730,7 +739,7 @@ namespace Apache.Arrow.Adbc.Tests
             private TemporarySchema(string catalogName, AdbcStatement 
statement)
             {
                 CatalogName = catalogName;
-                SchemaName = Guid.NewGuid().ToString().Replace("-", "");
+                SchemaName = Guid.NewGuid().ToString("N");
                 _statement = statement;
             }
 
diff --git a/csharp/test/Drivers/Apache/Common/StatementTests.cs 
b/csharp/test/Drivers/Apache/Common/StatementTests.cs
index 0af9e3f39..2234d8080 100644
--- a/csharp/test/Drivers/Apache/Common/StatementTests.cs
+++ b/csharp/test/Drivers/Apache/Common/StatementTests.cs
@@ -20,6 +20,7 @@ using System.Collections.Generic;
 using System.Threading.Tasks;
 using Apache.Arrow.Adbc.Drivers.Apache;
 using Apache.Arrow.Adbc.Tests.Xunit;
+using Apache.Arrow.Types;
 using Xunit;
 using Xunit.Abstractions;
 
@@ -182,6 +183,347 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Common
             using TemporaryTable temporaryTable = await 
NewTemporaryTableAsync(Statement, $"{columnName} INT");
             await 
ValidateInsertSelectDeleteSingleValueAsync(temporaryTable.TableName, 
columnName, 1);
         }
+
+        [SkippableFact]
+        public async Task CanGetCatalogs()
+        {
+            var statement = Connection.CreateStatement();
+            statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+            statement.SqlQuery = "GetCatalogs";
+
+            QueryResult queryResult = await statement.ExecuteQueryAsync();
+            Assert.NotNull(queryResult.Stream);
+
+            Assert.Single(queryResult.Stream.Schema.FieldsList);
+            int actualBatchLength = 0;
+
+            while (queryResult.Stream != null)
+            {
+                RecordBatch? batch = await 
queryResult.Stream.ReadNextRecordBatchAsync();
+                if (batch == null)
+                {
+                    break;
+                }
+                actualBatchLength += batch.Length;
+            }
+            if (TestEnvironment.SupportCatalogName)
+            {
+                Assert.True(actualBatchLength > 0);
+            }
+            else
+            {
+                Assert.True(actualBatchLength == 0);
+            }
+        }
+
+        [SkippableFact]
+        public async Task CanGetSchemas()
+        {
+            var statement = Connection.CreateStatement();
+            statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+            statement.SetOption(ApacheParameters.CatalogName, 
TestConfiguration.Metadata.Catalog);
+            statement.SetOption(ApacheParameters.SchemaName, 
TestConfiguration.Metadata.Schema);
+            statement.SqlQuery = "GetSchemas";
+
+            QueryResult queryResult = await statement.ExecuteQueryAsync();
+            Assert.NotNull(queryResult.Stream);
+
+            Assert.Equal(2, queryResult.Stream.Schema.FieldsList.Count);
+            int actualBatchLength = 0;
+
+            while (queryResult.Stream != null)
+            {
+                RecordBatch? batch = await 
queryResult.Stream.ReadNextRecordBatchAsync();
+                if (batch == null)
+                {
+                    break;
+                }
+                actualBatchLength += batch.Length;
+            }
+            Assert.Equal(1, actualBatchLength);
+        }
+
+        [SkippableFact]
+        public async Task CanGetTables()
+        {
+            var statement = Connection.CreateStatement();
+            statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+            statement.SetOption(ApacheParameters.CatalogName, 
TestConfiguration.Metadata.Catalog);
+            statement.SetOption(ApacheParameters.SchemaName, 
TestConfiguration.Metadata.Schema);
+            statement.SetOption(ApacheParameters.TableName, 
TestConfiguration.Metadata.Table);
+            statement.SqlQuery = "GetTables";
+
+            QueryResult queryResult = await statement.ExecuteQueryAsync();
+            Assert.NotNull(queryResult.Stream);
+
+            Assert.True(queryResult.Stream.Schema.FieldsList.Count >= 5);
+            int actualBatchLength = 0;
+
+            while (queryResult.Stream != null)
+            {
+                RecordBatch? batch = await 
queryResult.Stream.ReadNextRecordBatchAsync();
+                if (batch == null)
+                {
+                    break;
+                }
+                actualBatchLength += batch.Length;
+            }
+            Assert.Equal(1, actualBatchLength);
+        }
+
+        [SkippableFact]
+        public async Task CanGetColumns()
+        {
+            var statement = Connection.CreateStatement();
+            statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+            statement.SetOption(ApacheParameters.CatalogName, 
TestConfiguration.Metadata.Catalog);
+            statement.SetOption(ApacheParameters.SchemaName, 
TestConfiguration.Metadata.Schema);
+            statement.SetOption(ApacheParameters.TableName, 
TestConfiguration.Metadata.Table);
+            statement.SqlQuery = "GetColumns";
+
+            QueryResult queryResult = await statement.ExecuteQueryAsync();
+            Assert.NotNull(queryResult.Stream);
+
+            Assert.Equal(23, queryResult.Stream.Schema.FieldsList.Count);
+            int actualBatchLength = 0;
+
+            while (queryResult.Stream != null)
+            {
+                RecordBatch? batch = await 
queryResult.Stream.ReadNextRecordBatchAsync();
+                if (batch == null)
+                {
+                    break;
+                }
+                actualBatchLength += batch.Length;
+            }
+            Assert.Equal(TestConfiguration.Metadata.ExpectedColumnCount, 
actualBatchLength);
+        }
+
+        /// <summary>
+        /// Validates if the driver can execute GetPrimaryKeys metadata 
command.
+        /// </summary>
+        protected async Task CanGetPrimaryKeys(string? catalogName, string? 
schemaName)
+        {
+            PrepareCreateTableWithPrimaryKeys(
+                out string sqlUpdate,
+                out string tableName,
+                out string fullTableName,
+                out IReadOnlyList<string> primaryKeys);
+            using TemporaryTable temporaryTable = await 
TemporaryTable.NewTemporaryTableAsync(
+                    Statement,
+                    fullTableName,
+                    sqlUpdate,
+                    OutputHelper);
+
+            // Note: create a new statement to do metadata calls so it does 
not reuse the existing 'this.Statement'
+            AdbcStatement statement = Connection.CreateStatement();
+            statement.SetOption(ApacheParameters.CatalogName, catalogName ?? 
string.Empty);
+            statement.SetOption(ApacheParameters.SchemaName, schemaName ?? 
string.Empty);
+            statement.SetOption(ApacheParameters.TableName, tableName);
+            statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+            statement.SqlQuery = "GetPrimaryKeys";
+
+            await ValidateGetPrimaryKeys(catalogName, schemaName, tableName, 
primaryKeys, statement);
+        }
+
+        /// <summary>
+        /// Validates if the driver can execute GetCrossReference metadata 
command referencing the child table.
+        /// </summary>
+        protected async Task CanGetCrossReferenceFromChildTable(string? 
catalogName, string? schemaName)
+        {
+            PrepareCreateTableWithPrimaryKeys(
+                out string sqlUpdate,
+                out string tableNameParent,
+                out string fullTableNameParent,
+                out IReadOnlyList<string> primaryKeys);
+            using TemporaryTable temporaryTableParent = await 
TemporaryTable.NewTemporaryTableAsync(
+                Statement,
+                fullTableNameParent,
+                sqlUpdate,
+                OutputHelper);
+
+            PrepareCreateTableWithForeignKeys(
+                fullTableNameParent,
+                out sqlUpdate,
+                out string tableNameChild,
+                out string fullTableNameChild,
+                out IReadOnlyList<string> foreignKeys);
+            using TemporaryTable temporaryTableChild = await 
TemporaryTable.NewTemporaryTableAsync(
+                Statement,
+                fullTableNameChild,
+                sqlUpdate,
+                OutputHelper);
+
+            // Note: create a new statement to do metadata calls so it does 
not reuse the existing 'this.Statement'
+            AdbcStatement statement = Connection.CreateStatement();
+            // Either or both the parent and child namespace can be provided
+            statement.SetOption(ApacheParameters.ForeignCatalogName, 
catalogName ?? string.Empty);
+            statement.SetOption(ApacheParameters.ForeignSchemaName, schemaName 
?? string.Empty);
+            statement.SetOption(ApacheParameters.ForeignTableName, 
tableNameChild);
+            statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+            statement.SqlQuery = "GetCrossReference";
+
+            await ValidateGetCrossReference(catalogName, schemaName, 
tableNameParent, primaryKeys, foreignKeys, statement);
+        }
+
+        /// <summary>
+        /// Validates if the driver can execute GetCrossReference metadata 
command referencing the parent table.
+        /// </summary>
+        protected async Task CanGetCrossReferenceFromParentTable(string? 
catalogName, string? schemaName)
+        {
+            PrepareCreateTableWithPrimaryKeys(
+                out string sqlUpdate,
+                out string tableNameParent,
+                out string fullTableNameParent,
+                out IReadOnlyList<string> primaryKeys);
+            using TemporaryTable temporaryTableParent = await 
TemporaryTable.NewTemporaryTableAsync(
+                Statement,
+                fullTableNameParent,
+                sqlUpdate,
+                OutputHelper);
+
+            PrepareCreateTableWithForeignKeys(
+                fullTableNameParent,
+                out sqlUpdate,
+                out string tableNameChild,
+                out string fullTableNameChild,
+                out IReadOnlyList<string> foreignKeys);
+            using TemporaryTable temporaryTableChild = await 
TemporaryTable.NewTemporaryTableAsync(
+                Statement,
+                fullTableNameChild,
+                sqlUpdate,
+                OutputHelper);
+
+            // Note: create a new statement to do metadata calls so it does 
not reuse the existing 'this.Statement'
+            AdbcStatement statement = Connection.CreateStatement();
+            // Either or both the parent and child namespace can be provided
+            statement.SetOption(ApacheParameters.CatalogName, catalogName ?? 
string.Empty);
+            statement.SetOption(ApacheParameters.SchemaName, schemaName ?? 
string.Empty);
+            statement.SetOption(ApacheParameters.TableName, tableNameParent);
+            statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
+            statement.SqlQuery = "GetCrossReference";
+
+            await ValidateGetCrossReference(catalogName, schemaName, 
tableNameParent, primaryKeys, foreignKeys, statement);
+        }
+
+        protected virtual void PrepareCreateTableWithForeignKeys(string 
fullTableNameParent, out string sqlUpdate, out string tableNameChild, out 
string fullTableNameChild, out IReadOnlyList<string> foreignKeys)
+        {
+            CreateNewTableName(out tableNameChild, out fullTableNameChild);
+            sqlUpdate = $"CREATE TABLE IF NOT EXISTS {fullTableNameChild} \n"
+                + "  (INDEX INT, USERINDEX INT, USERNAME STRING, ADDRESS 
STRING, \n"
+                + "  PRIMARY KEY (INDEX) disable novalidate, \n"
+                + $"  FOREIGN KEY (USERINDEX, USERNAME) REFERENCES 
{fullTableNameParent} (INDEX, NAME) disable novalidate)";
+            foreignKeys = ["userindex", "username"];
+        }
+
+        protected virtual void PrepareCreateTableWithPrimaryKeys(out string 
sqlUpdate, out string tableNameParent, out string fullTableNameParent, out 
IReadOnlyList<string> primaryKeys)
+        {
+            CreateNewTableName(out tableNameParent, out fullTableNameParent);
+            sqlUpdate = $"CREATE TABLE IF NOT EXISTS {fullTableNameParent} 
(INDEX INT, NAME STRING, PRIMARY KEY (INDEX, NAME) disable novalidate)";
+            primaryKeys = ["index", "name"];
+        }
+
+        private static async Task ValidateGetPrimaryKeys(string? catalogName, 
string? schemaName, string tableName, IReadOnlyList<string> primaryKeys, 
AdbcStatement statement)
+        {
+            int expectedBatchLength = primaryKeys.Count;
+            int actualBatchLength = 0;
+
+            QueryResult queryResult = statement.ExecuteQuery();
+            Assert.NotNull(queryResult.Stream);
+
+            while (queryResult.Stream != null)
+            {
+                RecordBatch? batch = await 
queryResult.Stream.ReadNextRecordBatchAsync();
+                if (batch == null)
+                {
+                    break;
+                }
+
+                Assert.Equal(6, batch.ColumnCount);
+                Assert.Equal(6, queryResult.Stream.Schema.FieldsList.Count);
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[0].DataType);
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[1].DataType);
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[2].DataType);
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[3].DataType);
+                Assert.Equal(Int32Type.Default, 
queryResult.Stream.Schema.FieldsList[4].DataType);
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[5].DataType);
+                Assert.Equal(expectedBatchLength, batch.Length);
+                actualBatchLength += batch.Length;
+                for (int i = 0; i < batch.Length; i++)
+                {
+                    string? catalogNameActual = 
((StringArray)batch.Column(0)).GetString(i);
+                    Assert.True(string.Equals(catalogName, catalogNameActual)
+                        || string.IsNullOrEmpty(catalogName) && 
string.IsNullOrEmpty(catalogNameActual));
+                    string schemaNameActual = 
((StringArray)batch.Column(1)).GetString(i);
+                    Assert.Equal(schemaName, schemaNameActual);
+                    string tableNameActual = 
((StringArray)batch.Column(2)).GetString(i);
+                    Assert.Equal(tableName, tableNameActual);
+
+                    string? columnName = 
((StringArray)batch.Column(3)).GetString(i)?.ToLowerInvariant();
+                    int? keyIndex = ((Int32Array)batch.Column(4)).GetValue(i);
+                    Assert.True(keyIndex <= primaryKeys.Count);
+                    Assert.True(keyIndex.HasValue);
+                    Assert.Equal(primaryKeys[keyIndex.Value - 1], columnName);
+                }
+            }
+
+            Assert.Equal(expectedBatchLength, actualBatchLength);
+        }
+
+        private static async Task ValidateGetCrossReference(string? 
catalogName, string? schemaName, string tableNameParent, IReadOnlyList<string> 
primaryKeys, IReadOnlyList<string> foreignKeys, AdbcStatement statement)
+        {
+            int expectedBatchLength = primaryKeys.Count;
+            int actualBatchLength = 0;
+
+            QueryResult queryResult = statement.ExecuteQuery();
+            Assert.NotNull(queryResult.Stream);
+
+            while (queryResult.Stream != null)
+            {
+                RecordBatch? batch = await 
queryResult.Stream.ReadNextRecordBatchAsync();
+                if (batch == null)
+                {
+                    break;
+                }
+
+                int expectedColumnCount = 14;
+                Assert.Equal(expectedColumnCount, batch.ColumnCount);
+                Assert.Equal(expectedColumnCount, 
queryResult.Stream.Schema.FieldsList.Count);
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[0].DataType); // PK_CATALOG_NAME
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[1].DataType); // PK_SCHEMA_NAME
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[2].DataType); // PK_TABLE_NAME
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[3].DataType); // PK_COLUMN_NAME
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[4].DataType); // FK_CATALOG_NAME
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[5].DataType); // FK_SCHEMA_NAME
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[6].DataType); // FK_TABLE_NAME
+                Assert.Equal(StringType.Default, 
queryResult.Stream.Schema.FieldsList[7].DataType); // FK_COLUMN_NAME
+                Assert.Equal(Int32Type.Default, 
queryResult.Stream.Schema.FieldsList[8].DataType); // FK_INDEX
+                Assert.Equal(expectedBatchLength, batch.Length);
+                actualBatchLength += batch.Length;
+                for (int i = 0; i < batch.Length; i++)
+                {
+                    string? parentCatalogNameActual = 
((StringArray)batch.Column(0)).GetString(i);
+                    Assert.True(string.Equals(catalogName, 
parentCatalogNameActual)
+                        || string.IsNullOrEmpty(catalogName) && 
string.IsNullOrEmpty(parentCatalogNameActual));
+                    string parentSchemaNameActual = 
((StringArray)batch.Column(1)).GetString(i);
+                    Assert.Equal(schemaName, parentSchemaNameActual);
+                    string tableNameActual = 
((StringArray)batch.Column(2)).GetString(i);
+                    Assert.Equal(tableNameParent, tableNameActual);
+
+                    int? keyIndex = ((Int32Array)batch.Column(8)).GetValue(i);
+                    Assert.True(keyIndex <= primaryKeys.Count);
+                    Assert.True(keyIndex.HasValue);
+
+                    // Assume one-indexed key index
+                    string? parentColumnNameActual = 
((StringArray)batch.Column(3)).GetString(i)?.ToLowerInvariant();
+                    Assert.Equal(primaryKeys[keyIndex.Value - 1], 
parentColumnNameActual);
+                    string? foreignColumnNameActual = 
((StringArray)batch.Column(7)).GetString(i)?.ToLowerInvariant();
+                    Assert.Equal(foreignKeys[keyIndex.Value - 1], 
foreignColumnNameActual);
+                }
+            }
+
+            Assert.Equal(expectedBatchLength, actualBatchLength);
+        }
     }
 
     /// <summary>
diff --git a/csharp/test/Drivers/Apache/Hive2/StatementTests.cs 
b/csharp/test/Drivers/Apache/Hive2/StatementTests.cs
index 808b917eb..c3e90a432 100644
--- a/csharp/test/Drivers/Apache/Hive2/StatementTests.cs
+++ b/csharp/test/Drivers/Apache/Hive2/StatementTests.cs
@@ -15,7 +15,8 @@
 * limitations under the License.
 */
 
-using Apache.Arrow.Adbc.Tests.Drivers.Apache.Common;
+using System.Collections.Generic;
+using System.Threading.Tasks;
 using Xunit;
 using Xunit.Abstractions;
 
@@ -27,5 +28,23 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Hive2
             : base(outputHelper, new HiveServer2TestEnvironment.Factory())
         {
         }
+
+        [SkippableFact]
+        public async Task CanGetPrimaryKeysHive()
+        {
+            await base.CanGetPrimaryKeys(TestConfiguration.Metadata.Catalog, 
TestConfiguration.Metadata.Schema);
+        }
+
+        [SkippableFact]
+        public async Task CanGetCrossReferenceParentTableHive()
+        {
+            await 
base.CanGetCrossReferenceFromParentTable(TestConfiguration.Metadata.Catalog, 
TestConfiguration.Metadata.Schema);
+        }
+
+        [SkippableFact]
+        public async Task CanGetCrossReferenceChildTableHive()
+        {
+            await 
base.CanGetCrossReferenceFromChildTable(TestConfiguration.Metadata.Catalog, 
TestConfiguration.Metadata.Schema);
+        }
     }
 }
diff --git a/csharp/test/Drivers/Apache/Impala/StatementTests.cs 
b/csharp/test/Drivers/Apache/Impala/StatementTests.cs
index 5d41dbce9..0b943fc29 100644
--- a/csharp/test/Drivers/Apache/Impala/StatementTests.cs
+++ b/csharp/test/Drivers/Apache/Impala/StatementTests.cs
@@ -15,7 +15,8 @@
 * limitations under the License.
 */
 
-using Apache.Arrow.Adbc.Tests.Drivers.Apache.Common;
+using System.Collections.Generic;
+using System.Threading.Tasks;
 using Xunit;
 using Xunit.Abstractions;
 
@@ -27,5 +28,40 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Impala
             : base(outputHelper, new ImpalaTestEnvironment.Factory())
         {
         }
+
+        [SkippableFact]
+        public async Task CanGetPrimaryKeysImpala()
+        {
+            await base.CanGetPrimaryKeys(TestConfiguration.Metadata.Catalog, 
TestConfiguration.Metadata.Schema);
+        }
+
+        [SkippableFact]
+        public async Task CanGetCrossReferenceParentTableImpala()
+        {
+            await 
base.CanGetCrossReferenceFromParentTable(TestConfiguration.Metadata.Catalog, 
TestConfiguration.Metadata.Schema);
+        }
+
+        [SkippableFact]
+        public async Task CanGetCrossReferenceChildTableImpala()
+        {
+            await 
base.CanGetCrossReferenceFromChildTable(TestConfiguration.Metadata.Catalog, 
TestConfiguration.Metadata.Schema);
+        }
+
+        protected override void PrepareCreateTableWithForeignKeys(string 
fullTableNameParent, out string sqlUpdate, out string tableNameChild, out 
string fullTableNameChild, out IReadOnlyList<string> foreignKeys)
+        {
+            CreateNewTableName(out tableNameChild, out fullTableNameChild);
+            sqlUpdate = $"CREATE TABLE IF NOT EXISTS {fullTableNameChild} \n"
+                + "  (INDEX INT, USERINDEX INT, USERNAME STRING, ADDRESS 
STRING, \n"
+                + "  PRIMARY KEY (INDEX), \n"
+                + $"  FOREIGN KEY (USERINDEX, USERNAME) REFERENCES 
{fullTableNameParent} (INDEX, NAME))";
+            foreignKeys = ["userindex", "username"];
+        }
+
+        protected override void PrepareCreateTableWithPrimaryKeys(out string 
sqlUpdate, out string tableNameParent, out string fullTableNameParent, out 
IReadOnlyList<string> primaryKeys)
+        {
+            CreateNewTableName(out tableNameParent, out fullTableNameParent);
+            sqlUpdate = $"CREATE TABLE IF NOT EXISTS {fullTableNameParent} 
(INDEX INT, NAME STRING, PRIMARY KEY (INDEX, NAME))";
+            primaryKeys = ["index", "name"];
+        }
     }
 }
diff --git a/csharp/test/Drivers/Apache/Spark/StatementTests.cs 
b/csharp/test/Drivers/Apache/Spark/StatementTests.cs
index 5b6de0a44..da170a66d 100644
--- a/csharp/test/Drivers/Apache/Spark/StatementTests.cs
+++ b/csharp/test/Drivers/Apache/Spark/StatementTests.cs
@@ -16,6 +16,9 @@
 */
 
 using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Drivers.Apache.Spark;
 using Apache.Arrow.Adbc.Tests.Drivers.Apache.Common;
 using Xunit;
 using Xunit.Abstractions;
@@ -47,5 +50,33 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
                 Add(new(0, longRunningQuery, null));
             }
         }
+
+        [SkippableFact]
+        public async Task CanGetPrimaryKeysDatabricks()
+        {
+            Skip.If(TestEnvironment.ServerType != SparkServerType.Databricks);
+            await base.CanGetPrimaryKeys(TestConfiguration.Metadata.Catalog, 
TestConfiguration.Metadata.Schema);
+        }
+
+        [SkippableFact]
+        public async Task CanGetCrossReferenceFromParentTableDatabricks()
+        {
+            Skip.If(TestEnvironment.ServerType != SparkServerType.Databricks);
+            await 
base.CanGetCrossReferenceFromParentTable(TestConfiguration.Metadata.Catalog, 
TestConfiguration.Metadata.Schema);
+        }
+
+        [SkippableFact]
+        public async Task CanGetCrossReferenceFromChildTableDatabricks()
+        {
+            Skip.If(TestEnvironment.ServerType != SparkServerType.Databricks);
+            await 
base.CanGetCrossReferenceFromChildTable(TestConfiguration.Metadata.Catalog, 
TestConfiguration.Metadata.Schema);
+        }
+
+        protected override void PrepareCreateTableWithPrimaryKeys(out string 
sqlUpdate, out string tableNameParent, out string fullTableNameParent, out 
IReadOnlyList<string> primaryKeys)
+        {
+            CreateNewTableName(out tableNameParent, out fullTableNameParent);
+            sqlUpdate = $"CREATE TABLE IF NOT EXISTS {fullTableNameParent} 
(INDEX INT, NAME STRING, PRIMARY KEY (INDEX, NAME))";
+            primaryKeys = ["index", "name"];
+        }
     }
 }

Reply via email to