This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 0544d3b69 feat(csharp/src/Drivers/Apache): improve GetObjects metadata
returned for columns (#1884)
0544d3b69 is described below
commit 0544d3b6900129f8bee064b67a7f43b8a08bcda2
Author: Bruce Irschick <[email protected]>
AuthorDate: Fri May 31 10:26:17 2024 -0700
feat(csharp/src/Drivers/Apache): improve GetObjects metadata returned for
columns (#1884)
Improve the amount and quality of column metadata returned to the call
to `GetObjects` where depth is `All` for the Spark driver.
* Add an enumeration to provide identifiers for the Spark-specific data
types (based on JDBC Types constants).
* Improve metadata for column data types, precision and scale (for
DECIMAL/NUMERIC types), etc.
* Improve test to ensure new metadata is present.
---
.../src/Drivers/Apache/Properties/AssemblyInfo.cs | 18 +
csharp/src/Drivers/Apache/Spark/SparkConnection.cs | 430 ++++++++++++++++-----
csharp/test/Drivers/Apache/Spark/DriverTests.cs | 81 +++-
3 files changed, 436 insertions(+), 93 deletions(-)
diff --git a/csharp/src/Drivers/Apache/Properties/AssemblyInfo.cs
b/csharp/src/Drivers/Apache/Properties/AssemblyInfo.cs
new file mode 100644
index 000000000..164905524
--- /dev/null
+++ b/csharp/src/Drivers/Apache/Properties/AssemblyInfo.cs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Drivers.Apache,
PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
diff --git a/csharp/src/Drivers/Apache/Spark/SparkConnection.cs
b/csharp/src/Drivers/Apache/Spark/SparkConnection.cs
index 3e3bbcae2..c4a3a1b75 100644
--- a/csharp/src/Drivers/Apache/Spark/SparkConnection.cs
+++ b/csharp/src/Drivers/Apache/Spark/SparkConnection.cs
@@ -18,6 +18,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Reflection;
@@ -55,7 +56,19 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
const bool InfoVendorSql = true;
const int DecimalPrecisionDefault = 10;
const int DecimalScaleDefault = 0;
-
+ const string ColumnDef = "COLUMN_DEF";
+ const string ColumnName = "COLUMN_NAME";
+ const string DataType = "DATA_TYPE";
+ const string IsAutoIncrement = "IS_AUTO_INCREMENT";
+ const string IsNullable = "IS_NULLABLE";
+ const string OrdinalPosition = "ORDINAL_POSITION";
+ const string TableCat = "TABLE_CAT";
+ const string TableCatalog = "TABLE_CATALOG";
+ const string TableName = "TABLE_NAME";
+ const string TableSchem = "TABLE_SCHEM";
+ const string TableType = "TABLE_TYPE";
+ const string TypeName = "TYPE_NAME";
+ const string Nullable = "NULLABLE";
private readonly Lazy<string> _productVersion;
internal static TSparkGetDirectResults sparkGetDirectResults = new
TSparkGetDirectResults(1000);
@@ -65,24 +78,180 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{ "spark.thriftserver.arrowBasedRowSet.timestampAsString", "false"
}
};
- private enum ColumnTypeId
+ /// <summary>
+ /// The Spark data type definitions based on the <see
href="https://docs.oracle.com/en%2Fjava%2Fjavase%2F21%2Fdocs%2Fapi%2F%2F/java.sql/java/sql/Types.html">JDBC
Types</see> constants.
+ /// </summary>
+ /// <remarks>
+ /// This enumeration can be used to determine the Spark-specific data
types that are contained in fields <c>xdbc_data_type</c> and
<c>xdbc_sql_data_type</c>
+ /// in the column metadata <see cref="StandardSchemas.ColumnSchema"/>.
This column metadata is returned as a result of a call to
+ /// <see cref="AdbcConnection.GetObjects(GetObjectsDepth, string?,
string?, string?, IReadOnlyList{string}?, string?)"/>
+ /// when <c>depth</c> is set to <see
cref="AdbcConnection.GetObjectsDepth.All"/>.
+ /// </remarks>
+ internal enum ColumnTypeId
{
- BOOLEAN_TYPE = 16,
- TINYINT_TYPE = -6,
- SMALLINT_TYPE = 5,
- INT_TYPE = 4,
- BIGINT_TYPE = -5,
- FLOAT_TYPE = 6,
- DOUBLE_TYPE = 8,
- STRING_TYPE = 12,
- TIMESTAMP_TYPE = 93,
- BINARY_TYPE = -2,
- ARRAY_TYPE = 2003,
- MAP_TYPE = 2000,
- STRUCT_TYPE = 2002,
- DECIMAL_TYPE = 3,
- DATE_TYPE = 91,
- CHAR_TYPE = 1,
+ // NOTE: There is a partial copy of this enumeration in
test/Drivers/Apache/Spark/DriverTests.cs
+ // Please keep up-to-date.
+ // Copied from
https://docs.oracle.com/en%2Fjava%2Fjavase%2F21%2Fdocs%2Fapi%2F%2F/constant-values.html#java.sql.Types.ARRAY
+
+ /// <summary>
+ /// Identifies the generic SQL type ARRAY
+ /// </summary>
+ ARRAY = 2003,
+ /// <summary>
+ /// Identifies the generic SQL type BIGINT
+ /// </summary>
+ BIGINT = -5,
+ /// <summary>
+ /// Identifies the generic SQL type BINARY
+ /// </summary>
+ BINARY = -2,
+ /// <summary>
+ /// Identifies the generic SQL type BOOLEAN
+ /// </summary>
+ BOOLEAN = 16,
+ /// <summary>
+ /// Identifies the generic SQL type CHAR
+ /// </summary>
+ CHAR = 1,
+ /// <summary>
+ /// Identifies the generic SQL type DATE
+ /// </summary>
+ DATE = 91,
+ /// <summary>
+ /// Identifies the generic SQL type DECIMAL
+ /// </summary>
+ DECIMAL = 3,
+ /// <summary>
+ /// Identifies the generic SQL type DOUBLE
+ /// </summary>
+ DOUBLE = 8,
+ /// <summary>
+ /// Identifies the generic SQL type FLOAT
+ /// </summary>
+ FLOAT = 6,
+ /// <summary>
+ /// Identifies the generic SQL type INTEGER
+ /// </summary>
+ INTEGER = 4,
+ /// <summary>
+ /// Identifies the generic SQL type JAVA_OBJECT (MAP)
+ /// </summary>
+ JAVA_OBJECT = 2000,
+ /// <summary>
+ /// identifies the generic SQL type LONGNVARCHAR
+ /// </summary>
+ LONGNVARCHAR = -16,
+ /// <summary>
+ /// identifies the generic SQL type LONGVARBINARY
+ /// </summary>
+ LONGVARBINARY = -4,
+ /// <summary>
+ /// identifies the generic SQL type LONGVARCHAR
+ /// </summary>
+ LONGVARCHAR = -1,
+ /// <summary>
+ /// identifies the generic SQL type NCHAR
+ /// </summary>
+ NCHAR = -15,
+ /// <summary>
+ /// identifies the generic SQL value NULL
+ /// </summary>
+ NULL = 0,
+ /// <summary>
+ /// identifies the generic SQL type NUMERIC
+ /// </summary>
+ NUMERIC = 2,
+ /// <summary>
+ /// identifies the generic SQL type NVARCHAR
+ /// </summary>
+ NVARCHAR = -9,
+ /// <summary>
+ /// identifies the generic SQL type REAL
+ /// </summary>
+ REAL = 7,
+ /// <summary>
+ /// Identifies the generic SQL type SMALLINT
+ /// </summary>
+ SMALLINT = 5,
+ /// <summary>
+ /// Identifies the generic SQL type STRUCT
+ /// </summary>
+ STRUCT = 2002,
+ /// <summary>
+ /// Identifies the generic SQL type TIMESTAMP
+ /// </summary>
+ TIMESTAMP = 93,
+ /// <summary>
+ /// Identifies the generic SQL type TINYINT
+ /// </summary>
+ TINYINT = -6,
+ /// <summary>
+ /// Identifies the generic SQL type VARBINARY
+ /// </summary>
+ VARBINARY = -3,
+ /// <summary>
+ /// Identifies the generic SQL type VARCHAR
+ /// </summary>
+ VARCHAR = 12,
+ // ======================
+ // Unused/unsupported
+ // ======================
+ /// <summary>
+ /// Identifies the generic SQL type BIT
+ /// </summary>
+ BIT = -7,
+ /// <summary>
+ /// Identifies the generic SQL type BLOB
+ /// </summary>
+ BLOB = 2004,
+ /// <summary>
+ /// Identifies the generic SQL type CLOB
+ /// </summary>
+ CLOB = 2005,
+ /// <summary>
+ /// Identifies the generic SQL type DATALINK
+ /// </summary>
+ DATALINK = 70,
+ /// <summary>
+ /// Identifies the generic SQL type DISTINCT
+ /// </summary>
+ DISTINCT = 2001,
+ /// <summary>
+ /// identifies the generic SQL type NCLOB
+ /// </summary>
+ NCLOB = 2011,
+ /// <summary>
+ /// Indicates that the SQL type is database-specific and gets
mapped to a Java object
+ /// </summary>
+ OTHER = 1111,
+ /// <summary>
+ /// Identifies the generic SQL type REF CURSOR
+ /// </summary>
+ REF_CURSOR = 2012,
+ /// <summary>
+ /// Identifies the generic SQL type REF
+ /// </summary>
+ REF = 2006,
+ /// <summary>
+ /// Identifies the generic SQL type ROWID
+ /// </summary>
+ ROWID = -8,
+ /// <summary>
+ /// Identifies the generic SQL type XML
+ /// </summary>
+ SQLXML = 2009,
+ /// <summary>
+ /// Identifies the generic SQL type TIME
+ /// </summary>
+ TIME = 92,
+ /// <summary>
+ /// Identifies the generic SQL type TIME WITH TIMEZONE
+ /// </summary>
+ TIME_WITH_TIMEZONE = 2013,
+ /// <summary>
+ /// Identifies the generic SQL type TIMESTAMP WITH TIMEZONE
+ /// </summary>
+ TIMESTAMP_WITH_TIMEZONE = 2014,
}
internal SparkConnection(IReadOnlyDictionary<string, string>
properties)
@@ -286,8 +455,24 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
public override IArrowArrayStream GetTableTypes()
{
+ TGetTableTypesReq req = new()
+ {
+ SessionHandle = this.sessionHandle ?? throw new
InvalidOperationException("session not created"),
+ GetDirectResults = sparkGetDirectResults
+ };
+ TGetTableTypesResp resp = this.Client.GetTableTypes(req).Result;
+ if (resp.Status.StatusCode == TStatusCode.ERROR_STATUS)
+ {
+ throw new HiveServer2Exception(resp.Status.ErrorMessage)
+ .SetNativeError(resp.Status.ErrorCode)
+ .SetSqlState(resp.Status.SqlState);
+ }
+
+ List<TColumn> columns =
resp.DirectResults.ResultSet.Results.Columns;
+ StringArray tableTypes = columns[0].StringVal.Values;
+
StringArray.Builder tableTypesBuilder = new StringArray.Builder();
- tableTypesBuilder.AppendRange(new string[] { "BASE TABLE", "VIEW"
});
+ tableTypesBuilder.AppendRange(tableTypes);
IArrowArray[] dataArrays = new IArrowArray[]
{
@@ -326,7 +511,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
//int? columnSize = columns[6].I32Val.Values.GetValue(i);
//int? decimalDigits = columns[8].I32Val.Values.GetValue(i);
bool nullable = columns[10].I32Val.Values.GetValue(i) == 1;
- IArrowType dataType =
SparkConnection.GetArrowType((ColumnTypeId)columnType!.Value, typeName);
+ IArrowType dataType =
SparkConnection.GetArrowType(columnType!.Value, typeName);
fields[i] = new Field(columnName, dataType, nullable);
}
return new Schema(fields, null);
@@ -336,7 +521,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
Trace.TraceError($"getting objects with depth={depth.ToString()},
catalog = {catalogPattern}, dbschema = {dbSchemaPattern}, tablename =
{tableNamePattern}");
- Dictionary<string, Dictionary<string, Dictionary<string,
TableInfoPair>>> catalogMap = new Dictionary<string, Dictionary<string,
Dictionary<string, TableInfoPair>>>();
+ Dictionary<string, Dictionary<string, Dictionary<string,
TableInfo>>> catalogMap = new Dictionary<string, Dictionary<string,
Dictionary<string, TableInfo>>>();
if (depth == GetObjectsDepth.All || depth >=
GetObjectsDepth.Catalogs)
{
TGetCatalogsReq getCatalogsReq = new
TGetCatalogsReq(this.sessionHandle);
@@ -347,10 +532,11 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
throw new Exception(getCatalogsResp.Status.ErrorMessage);
}
+ IReadOnlyDictionary<string, int> columnMap =
GetColumnIndexMap(getCatalogsResp.DirectResults.ResultSetMetadata.Schema.Columns);
string catalogRegexp = PatternToRegEx(catalogPattern);
TRowSet resp = getCatalogsResp.DirectResults.ResultSet.Results;
- IReadOnlyList<string> list = resp.Columns[0].StringVal.Values;
+ IReadOnlyList<string> list =
resp.Columns[columnMap[TableCat]].StringVal.Values;
for (int i = 0; i < list.Count; i++)
{
string col = list[i];
@@ -358,7 +544,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
if (Regex.IsMatch(catalog, catalogRegexp,
RegexOptions.IgnoreCase))
{
- catalogMap.Add(catalog, new Dictionary<string,
Dictionary<string, TableInfoPair>>());
+ catalogMap.Add(catalog, new Dictionary<string,
Dictionary<string, TableInfo>>());
}
}
}
@@ -375,17 +561,18 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
throw new Exception(getSchemasResp.Status.ErrorMessage);
}
+ IReadOnlyDictionary<string, int> columnMap =
GetColumnIndexMap(getSchemasResp.DirectResults.ResultSetMetadata.Schema.Columns);
TRowSet resp = getSchemasResp.DirectResults.ResultSet.Results;
- IReadOnlyList<string> catalogList =
resp.Columns[1].StringVal.Values;
- IReadOnlyList<string> schemaList =
resp.Columns[0].StringVal.Values;
+ IReadOnlyList<string> catalogList =
resp.Columns[columnMap[TableCatalog]].StringVal.Values;
+ IReadOnlyList<string> schemaList =
resp.Columns[columnMap[TableSchem]].StringVal.Values;
for (int i = 0; i < catalogList.Count; i++)
{
string catalog = catalogList[i];
string schemaDb = schemaList[i];
// It seems Spark sometimes returns empty string for
catalog on some schema (temporary tables).
- catalogMap.GetValueOrDefault(catalog)?.Add(schemaDb, new
Dictionary<string, TableInfoPair>());
+ catalogMap.GetValueOrDefault(catalog)?.Add(schemaDb, new
Dictionary<string, TableInfo>());
}
}
@@ -402,12 +589,14 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
throw new Exception(getTablesResp.Status.ErrorMessage);
}
+
+ IReadOnlyDictionary<string, int> columnMap =
GetColumnIndexMap(getTablesResp.DirectResults.ResultSetMetadata.Schema.Columns);
TRowSet resp = getTablesResp.DirectResults.ResultSet.Results;
- IReadOnlyList<string> catalogList =
resp.Columns[0].StringVal.Values;
- IReadOnlyList<string> schemaList =
resp.Columns[1].StringVal.Values;
- IReadOnlyList<string> tableList =
resp.Columns[2].StringVal.Values;
- IReadOnlyList<string> tableTypeList =
resp.Columns[3].StringVal.Values;
+ IReadOnlyList<string> catalogList =
resp.Columns[columnMap[TableCat]].StringVal.Values;
+ IReadOnlyList<string> schemaList =
resp.Columns[columnMap[TableSchem]].StringVal.Values;
+ IReadOnlyList<string> tableList =
resp.Columns[columnMap[TableName]].StringVal.Values;
+ IReadOnlyList<string> tableTypeList =
resp.Columns[columnMap[TableType]].StringVal.Values;
for (int i = 0; i < catalogList.Count; i++)
{
@@ -415,10 +604,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
string schemaDb = schemaList[i];
string tableName = tableList[i];
string tableType = tableTypeList[i];
- TableInfoPair tableInfo = new TableInfoPair();
- tableInfo.Type = tableType;
- tableInfo.Columns = new List<string>();
- tableInfo.ColType = new List<int>();
+ TableInfo tableInfo = new(tableType);
catalogMap.GetValueOrDefault(catalog)?.GetValueOrDefault(schemaDb)?.Add(tableName,
tableInfo);
}
}
@@ -440,31 +626,53 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
throw new Exception(columnsResponse.Status.ErrorMessage);
}
+ IReadOnlyDictionary<string, int> columnMap =
GetColumnIndexMap(columnsResponse.DirectResults.ResultSetMetadata.Schema.Columns);
TRowSet resp = columnsResponse.DirectResults.ResultSet.Results;
- IReadOnlyList<string> catalogList =
resp.Columns[0].StringVal.Values;
- IReadOnlyList<string> schemaList =
resp.Columns[1].StringVal.Values;
- IReadOnlyList<string> tableList =
resp.Columns[2].StringVal.Values;
- IReadOnlyList<string> columnList =
resp.Columns[3].StringVal.Values;
- ReadOnlySpan<int> columnTypeList =
resp.Columns[4].I32Val.Values.Values;
+ IReadOnlyList<string> catalogList =
resp.Columns[columnMap[TableCat]].StringVal.Values;
+ IReadOnlyList<string> schemaList =
resp.Columns[columnMap[TableSchem]].StringVal.Values;
+ IReadOnlyList<string> tableList =
resp.Columns[columnMap[TableName]].StringVal.Values;
+ IReadOnlyList<string> columnNameList =
resp.Columns[columnMap[ColumnName]].StringVal.Values;
+ ReadOnlySpan<int> columnTypeList =
resp.Columns[columnMap[DataType]].I32Val.Values.Values;
+ IReadOnlyList<string> typeNameList =
resp.Columns[columnMap[TypeName]].StringVal.Values;
+ ReadOnlySpan<int> nullableList =
resp.Columns[columnMap[Nullable]].I32Val.Values.Values;
+ IReadOnlyList<string> columnDefaultList =
resp.Columns[columnMap[ColumnDef]].StringVal.Values;
+ ReadOnlySpan<int> ordinalPosList =
resp.Columns[columnMap[OrdinalPosition]].I32Val.Values.Values;
+ IReadOnlyList<string> isNullableList =
resp.Columns[columnMap[IsNullable]].StringVal.Values;
+ IReadOnlyList<string> isAutoIncrementList =
resp.Columns[columnMap[IsAutoIncrement]].StringVal.Values;
for (int i = 0; i < catalogList.Count; i++)
{
string catalog = catalogList[i];
string schemaDb = schemaList[i];
string tableName = tableList[i];
- string column = columnList[i];
- int colType = columnTypeList[i];
- TableInfoPair? tableInfo =
catalogMap.GetValueOrDefault(catalog)?.GetValueOrDefault(schemaDb)?.GetValueOrDefault(tableName);
- tableInfo?.Columns.Add(column);
+ string columnName = columnNameList[i];
+ short colType = (short)columnTypeList[i];
+ string typeName = typeNameList[i];
+ short nullable = (short)nullableList[i];
+ string? isAutoIncrementString = isAutoIncrementList[i];
+ bool isAutoIncrement =
(!string.IsNullOrEmpty(isAutoIncrementString) &&
(isAutoIncrementString.Equals("YES",
StringComparison.InvariantCultureIgnoreCase) ||
isAutoIncrementString.Equals("TRUE",
StringComparison.InvariantCultureIgnoreCase)));
+ string isNullable = isNullableList[i] ?? "YES";
+ string columnDefault = columnDefaultList[i] ?? "";
+ // Spark/Databricks reports ordinal index zero-indexed,
instead of one-indexed
+ int ordinalPos = ordinalPosList[i] + 1;
+ TableInfo? tableInfo =
catalogMap.GetValueOrDefault(catalog)?.GetValueOrDefault(schemaDb)?.GetValueOrDefault(tableName);
+ tableInfo?.ColumnName.Add(columnName);
tableInfo?.ColType.Add(colType);
+ tableInfo?.Nullable.Add(nullable);
+ tableInfo?.TypeName.Add(typeName);
+ tableInfo?.IsAutoIncrement.Add(isAutoIncrement);
+ tableInfo?.IsNullable.Add(isNullable);
+ tableInfo?.ColumnDefault.Add(columnDefault);
+ tableInfo?.OrdinalPosition.Add(ordinalPos);
+ SetPrecisionAndScale(colType, typeName, tableInfo);
}
}
StringArray.Builder catalogNameBuilder = new StringArray.Builder();
List<IArrowArray?> catalogDbSchemasValues = new
List<IArrowArray?>();
- foreach (KeyValuePair<string, Dictionary<string,
Dictionary<string, TableInfoPair>>> catalogEntry in catalogMap)
+ foreach (KeyValuePair<string, Dictionary<string,
Dictionary<string, TableInfo>>> catalogEntry in catalogMap)
{
catalogNameBuilder.Append(catalogEntry.Key);
@@ -490,41 +698,75 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
return new SparkInfoArrowStream(schema, dataArrays);
}
- private static IArrowType GetArrowType(ColumnTypeId columnTypeId,
string typeName)
+ private static IReadOnlyDictionary<string, int>
GetColumnIndexMap(List<TColumnDesc> columns) => columns
+ .Select(t => new { Index = t.Position - 1, t.ColumnName })
+ .ToDictionary(t => t.ColumnName, t => t.Index);
+
+ private static void SetPrecisionAndScale(short colType, string
typeName, TableInfo? tableInfo)
+ {
+ switch (colType)
+ {
+ case (short)ColumnTypeId.DECIMAL:
+ case (short)ColumnTypeId.NUMERIC:
+ {
+ Decimal128Type decimalType =
SqlDecimalTypeParser.ParseOrDefault(typeName, new
Decimal128Type(DecimalPrecisionDefault, DecimalScaleDefault));
+ tableInfo?.Precision.Add(decimalType.Precision);
+ tableInfo?.Scale.Add((short)decimalType.Scale);
+ break;
+ }
+
+ default:
+ tableInfo?.Precision.Add(null);
+ tableInfo?.Scale.Add(null);
+ break;
+ }
+ }
+
+ private static IArrowType GetArrowType(int columnTypeId, string
typeName)
{
switch (columnTypeId)
{
- case ColumnTypeId.BOOLEAN_TYPE:
+ case (int)ColumnTypeId.BOOLEAN:
return BooleanType.Default;
- case ColumnTypeId.TINYINT_TYPE:
+ case (int)ColumnTypeId.TINYINT:
return Int8Type.Default;
- case ColumnTypeId.SMALLINT_TYPE:
+ case (int)ColumnTypeId.SMALLINT:
return Int16Type.Default;
- case ColumnTypeId.INT_TYPE:
+ case (int)ColumnTypeId.INTEGER:
return Int32Type.Default;
- case ColumnTypeId.BIGINT_TYPE:
+ case (int)ColumnTypeId.BIGINT:
return Int64Type.Default;
- case ColumnTypeId.FLOAT_TYPE:
+ case (int)ColumnTypeId.FLOAT:
+ case (int)ColumnTypeId.REAL:
return FloatType.Default;
- case ColumnTypeId.DOUBLE_TYPE:
+ case (int)ColumnTypeId.DOUBLE:
return DoubleType.Default;
- case ColumnTypeId.STRING_TYPE:
+ case (int)ColumnTypeId.VARCHAR:
+ case (int)ColumnTypeId.NVARCHAR:
+ case (int)ColumnTypeId.LONGVARCHAR:
+ case (int)ColumnTypeId.LONGNVARCHAR:
return StringType.Default;
- case ColumnTypeId.TIMESTAMP_TYPE:
+ case (int)ColumnTypeId.TIMESTAMP:
return new TimestampType(TimeUnit.Microsecond, timezone:
(string?)null);
- case ColumnTypeId.BINARY_TYPE:
+ case (int)ColumnTypeId.BINARY:
+ case (int)ColumnTypeId.VARBINARY:
+ case (int)ColumnTypeId.LONGVARBINARY:
return BinaryType.Default;
- case ColumnTypeId.DATE_TYPE:
+ case (int)ColumnTypeId.DATE:
return Date32Type.Default;
- case ColumnTypeId.CHAR_TYPE:
+ case (int)ColumnTypeId.CHAR:
+ case (int)ColumnTypeId.NCHAR:
return StringType.Default;
- case ColumnTypeId.DECIMAL_TYPE:
+ case (int)ColumnTypeId.DECIMAL:
+ case (int)ColumnTypeId.NUMERIC:
// Note: parsing the type name for SQL DECIMAL types as
the precision and scale values
// are not returned in the Thrift call to GetColumns
return SqlDecimalTypeParser.ParseOrDefault(typeName, new
Decimal128Type(DecimalPrecisionDefault, DecimalScaleDefault));
- case ColumnTypeId.ARRAY_TYPE:
- case ColumnTypeId.MAP_TYPE:
- case ColumnTypeId.STRUCT_TYPE:
+ case (int)ColumnTypeId.NULL:
+ return NullType.Default;
+ case (int)ColumnTypeId.ARRAY:
+ case (int)ColumnTypeId.JAVA_OBJECT:
+ case (int)ColumnTypeId.STRUCT:
return StringType.Default;
default:
throw new NotImplementedException($"Column type id:
{columnTypeId} is not supported.");
@@ -533,7 +775,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
private StructArray GetDbSchemas(
GetObjectsDepth depth,
- Dictionary<string, Dictionary<string, TableInfoPair>> schemaMap)
+ Dictionary<string, Dictionary<string, TableInfo>> schemaMap)
{
StringArray.Builder dbSchemaNameBuilder = new
StringArray.Builder();
List<IArrowArray?> dbSchemaTablesValues = new List<IArrowArray?>();
@@ -541,7 +783,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
int length = 0;
- foreach (KeyValuePair<string, Dictionary<string, TableInfoPair>>
schemaEntry in schemaMap)
+ foreach (KeyValuePair<string, Dictionary<string, TableInfo>>
schemaEntry in schemaMap)
{
dbSchemaNameBuilder.Append(schemaEntry.Key);
@@ -577,7 +819,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
private StructArray GetTableSchemas(
GetObjectsDepth depth,
- Dictionary<string, TableInfoPair> tableMap)
+ Dictionary<string, TableInfo> tableMap)
{
StringArray.Builder tableNameBuilder = new StringArray.Builder();
StringArray.Builder tableTypeBuilder = new StringArray.Builder();
@@ -587,7 +829,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
int length = 0;
- foreach (KeyValuePair<string, TableInfoPair> tableEntry in
tableMap)
+ foreach (KeyValuePair<string, TableInfo> tableEntry in tableMap)
{
tableNameBuilder.Append(tableEntry.Key);
tableTypeBuilder.Append(tableEntry.Value.Type);
@@ -604,7 +846,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
}
else
{
-
tableColumnsValues.Add(GetColumnSchema(tableEntry.Value.Columns,
tableEntry.Value.ColType));
+ tableColumnsValues.Add(GetColumnSchema(tableEntry.Value));
}
}
@@ -626,8 +868,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
nullBitmapBuffer.Build());
}
- private StructArray GetColumnSchema(
- List<string> columns, List<int> colTypes)
+ private StructArray GetColumnSchema(TableInfo tableInfo)
{
StringArray.Builder columnNameBuilder = new StringArray.Builder();
Int32Array.Builder ordinalPositionBuilder = new
Int32Array.Builder();
@@ -652,31 +893,26 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
int length = 0;
- for (int i = 0; i < columns.Count; i++)
+ for (int i = 0; i < tableInfo.ColumnName.Count; i++)
{
- columnNameBuilder.Append(columns[i]);
- ordinalPositionBuilder.Append((int)colTypes[i]);
+ columnNameBuilder.Append(tableInfo.ColumnName[i]);
+ ordinalPositionBuilder.Append(tableInfo.OrdinalPosition[i]);
remarksBuilder.Append("");
-
-
-
- xdbcColumnSizeBuilder.AppendNull();
- xdbcDecimalDigitsBuilder.AppendNull();
-
-
- xdbcDataTypeBuilder.AppendNull();
- xdbcTypeNameBuilder.Append("");
+ xdbcColumnSizeBuilder.Append(tableInfo.Precision[i]);
+ xdbcDecimalDigitsBuilder.Append(tableInfo.Scale[i]);
+ xdbcDataTypeBuilder.Append(tableInfo.ColType[i]);
+ xdbcTypeNameBuilder.Append(tableInfo.TypeName[i]);
xdbcNumPrecRadixBuilder.AppendNull();
- xdbcNullableBuilder.AppendNull();
- xdbcColumnDefBuilder.AppendNull();
- xdbcSqlDataTypeBuilder.Append((short)colTypes[i]);
+ xdbcNullableBuilder.Append(tableInfo.Nullable[i]);
+ xdbcColumnDefBuilder.Append(tableInfo.ColumnDefault[i]);
+ xdbcSqlDataTypeBuilder.Append(tableInfo.ColType[i]);
xdbcDatetimeSubBuilder.AppendNull();
xdbcCharOctetLengthBuilder.AppendNull();
- xdbcIsNullableBuilder.Append("true");
+ xdbcIsNullableBuilder.Append(tableInfo.IsNullable[i]);
xdbcScopeCatalogBuilder.AppendNull();
xdbcScopeSchemaBuilder.AppendNull();
xdbcScopeTableBuilder.AppendNull();
- xdbcIsAutoincrementBuilder.AppendNull();
+
xdbcIsAutoincrementBuilder.Append(tableInfo.IsAutoIncrement[i]);
xdbcIsGeneratedcolumnBuilder.Append(true);
nullBitmapBuffer.Append(true);
length++;
@@ -790,13 +1026,29 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
}
}
- internal struct TableInfoPair
+ internal struct TableInfo(string type)
{
- public string Type { get; set; }
+ public string Type { get; } = type;
+
+ public List<string> ColumnName { get; } = new();
+
+ public List<short> ColType { get; } = new();
+
+ public List<string> TypeName { get; } = new();
+
+ public List<short> Nullable { get; } = new();
+
+ public List<int?> Precision { get; } = new();
+
+ public List<short?> Scale { get; } = new();
+
+ public List<int> OrdinalPosition { get; } = new();
+
+ public List<string> ColumnDefault { get; } = new();
- public List<string> Columns { get; set; }
+ public List<string> IsNullable { get; } = new();
- public List<int> ColType { get; set; }
+ public List<bool> IsAutoIncrement { get; } = new();
}
internal class SparkInfoArrowStream : IArrowArrayStream
diff --git a/csharp/test/Drivers/Apache/Spark/DriverTests.cs
b/csharp/test/Drivers/Apache/Spark/DriverTests.cs
index a7507e473..822dae2e0 100644
--- a/csharp/test/Drivers/Apache/Spark/DriverTests.cs
+++ b/csharp/test/Drivers/Apache/Spark/DriverTests.cs
@@ -24,6 +24,7 @@ using Apache.Arrow.Adbc.Tests.Xunit;
using Apache.Arrow.Ipc;
using Xunit;
using Xunit.Abstractions;
+using ColumnTypeId =
Apache.Arrow.Adbc.Drivers.Apache.Spark.SparkConnection.ColumnTypeId;
namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
{
@@ -37,7 +38,39 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
[TestCaseOrderer("Apache.Arrow.Adbc.Tests.Xunit.TestOrderer",
"Apache.Arrow.Adbc.Tests")]
public class DriverTests : SparkTestBase
{
- private static List<string> DefaultTableTypes => new() { "BASE TABLE",
"VIEW" };
+ /// <summary>
+ /// Supported Spark data types as a subset of <see
cref="SparkConnection.ColumnTypeId"/>
+ /// </summary>
+ private enum SupportedSparkDataType : short
+ {
+ ARRAY = ColumnTypeId.ARRAY,
+ BIGINT = ColumnTypeId.BIGINT,
+ BINARY = ColumnTypeId.BINARY,
+ BOOLEAN = ColumnTypeId.BOOLEAN,
+ CHAR = ColumnTypeId.CHAR,
+ DATE = ColumnTypeId.DATE,
+ DECIMAL = ColumnTypeId.DECIMAL,
+ DOUBLE = ColumnTypeId.DOUBLE,
+ FLOAT = ColumnTypeId.FLOAT,
+ INTEGER = ColumnTypeId.INTEGER,
+ JAVA_OBJECT = ColumnTypeId.JAVA_OBJECT,
+ LONGNVARCHAR = ColumnTypeId.LONGNVARCHAR,
+ LONGVARBINARY = ColumnTypeId.LONGVARBINARY,
+ LONGVARCHAR = ColumnTypeId.LONGVARCHAR,
+ NCHAR = ColumnTypeId.NCHAR,
+ NULL = ColumnTypeId.NULL,
+ NUMERIC = ColumnTypeId.NUMERIC,
+ NVARCHAR = ColumnTypeId.NVARCHAR,
+ REAL = ColumnTypeId.REAL,
+ SMALLINT = ColumnTypeId.SMALLINT,
+ STRUCT = ColumnTypeId.STRUCT,
+ TIMESTAMP = ColumnTypeId.TIMESTAMP,
+ TINYINT = ColumnTypeId.TINYINT,
+ VARBINARY = ColumnTypeId.VARBINARY,
+ VARCHAR = ColumnTypeId.VARCHAR,
+ }
+
+ private static List<string> DefaultTableTypes => new() { "TABLE",
"VIEW" };
public DriverTests(ITestOutputHelper? outputHelper) :
base(outputHelper)
{
@@ -266,7 +299,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
AdbcTable? table = tables?.Where((table) =>
string.Equals(table.Name, tableName)).FirstOrDefault();
Assert.True(table != null, "table should not be null");
// TODO: Determine why this is returned blank.
- //Assert.Equal("BASE TABLE", table.Type);
+ //Assert.Equal("TABLE", table.Type);
}
/// <summary>
@@ -304,11 +337,51 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
Assert.True(table != null, "table should not be null");
// TODO: Determine why this is returned blank.
- //Assert.Equal("BASE TABLE", table.Type);
+ //Assert.Equal("TABLE", table.Type);
List<AdbcColumn>? columns = table.Columns;
Assert.True(columns != null, "Columns cannot be null");
Assert.Equal(TestConfiguration.Metadata.ExpectedColumnCount,
columns.Count);
+
+ for (int i = 0; i < columns.Count; i++)
+ {
+ // Verify column metadata is returned/consistent.
+ AdbcColumn column = columns[i];
+ Assert.Equal(i + 1, column.OrdinalPosition);
+ Assert.False(string.IsNullOrEmpty(column.Name));
+ Assert.False(string.IsNullOrEmpty(column.XdbcTypeName));
+
+ var types =
Enum.GetValues(typeof(SupportedSparkDataType)).Cast<SupportedSparkDataType>();
+
Assert.Contains((SupportedSparkDataType)column.XdbcSqlDataType!, types);
+ Assert.Equal(column.XdbcDataType, column.XdbcSqlDataType);
+
+ Assert.NotNull(column.XdbcDataType);
+ Assert.Contains((SupportedSparkDataType)column.XdbcDataType!,
types);
+
+ bool isDecimalType = column.XdbcDataType ==
(short)SupportedSparkDataType.DECIMAL || column.XdbcDataType ==
(short)SupportedSparkDataType.NUMERIC;
+ Assert.Equal(column.XdbcColumnSize.HasValue, isDecimalType);
+ Assert.Equal(column.XdbcDecimalDigits.HasValue, isDecimalType);
+
+ Assert.NotNull(column.Remarks);
+ Assert.True(string.IsNullOrEmpty(column.Remarks));
+
+ Assert.NotNull(column.XdbcColumnDef);
+
+ Assert.NotNull(column.XdbcNullable);
+ Assert.Contains(new short[] { 1, 0 }, i => i ==
column.XdbcNullable);
+
+ Assert.NotNull(column.XdbcIsNullable);
+ Assert.Contains(new string[] { "YES", "NO" }, i =>
i.Equals(column.XdbcIsNullable));
+
+ Assert.NotNull(column.XdbcIsAutoIncrement);
+
+ Assert.Null(column.XdbcCharOctetLength);
+ Assert.Null(column.XdbcDatetimeSub);
+ Assert.Null(column.XdbcNumPrecRadix);
+ Assert.Null(column.XdbcScopeCatalog);
+ Assert.Null(column.XdbcScopeSchema);
+ Assert.Null(column.XdbcScopeTable);
+ }
}
/// <summary>
@@ -390,7 +463,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
List<string> known_types = new List<string>
{
- "BASE TABLE", "VIEW"
+ "TABLE", "VIEW"
};
int results = 0;