davidhcoe commented on code in PR #1192: URL: https://github.com/apache/arrow-adbc/pull/1192#discussion_r1367489024
########## csharp/src/Drivers/BigQuery/BigQueryConnection.cs: ########## @@ -0,0 +1,954 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using System.Net.Http; +using System.Text; +using System.Text.Json; +using System.Text.RegularExpressions; +using Apache.Arrow.Ipc; +using Apache.Arrow.Types; +using Google.Api.Gax; +using Google.Apis.Auth.OAuth2; +using Google.Apis.Bigquery.v2.Data; +using Google.Cloud.BigQuery.V2; + +namespace Apache.Arrow.Adbc.Drivers.BigQuery +{ + /// <summary> + /// BigQuery-specific implementation of <see cref="AdbcConnection"/> + /// </summary> + public class BigQueryConnection : AdbcConnection + { + readonly IReadOnlyDictionary<string, string> properties; + BigQueryClient? client; + GoogleCredential? credential; + + const string infoDriverName = "ADBC BigQuery Driver"; + const string infoDriverVersion = "1.0.0"; + const string infoVendorName = "BigQuery"; + const string infoDriverArrowVersion = "1.0.0"; + + readonly IReadOnlyList<AdbcInfoCode> infoSupportedCodes = new List<AdbcInfoCode> { + AdbcInfoCode.DriverName, + AdbcInfoCode.DriverVersion, + AdbcInfoCode.DriverArrowVersion, + AdbcInfoCode.VendorName + }; + + public BigQueryConnection(IReadOnlyDictionary<string, string> properties) + { + this.properties = properties; + } + + /// <summary> + /// Initializes the internal BigQuery connection + /// </summary> + /// <exception cref="ArgumentException"></exception> + internal void Open() + { + string projectId = string.Empty; + string clientId = string.Empty; + string clientSecret = string.Empty; + string refreshToken = string.Empty; + + string tokenEndpoint = BigQueryConstants.TokenEndpoint; + + string authenticationType = BigQueryConstants.UserAuthenticationType; + + // TODO: handle token expiration + + if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId)) + throw new ArgumentException($"The {BigQueryParameters.ProjectId} parameter is not present"); + + if (this.properties.ContainsKey(BigQueryParameters.AuthenticationType)) + { + this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out authenticationType); + + if(!authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase) && + !authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType, StringComparison.OrdinalIgnoreCase)) + { + throw new ArgumentException($"The {BigQueryParameters.AuthenticationType} parameter can only be `{BigQueryConstants.UserAuthenticationType}` or `{BigQueryConstants.ServiceAccountAuthenticationType}`"); + } + } + + if (authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase)) + { + if (!this.properties.TryGetValue(BigQueryParameters.ClientId, out clientId)) + throw new ArgumentException($"The {BigQueryParameters.ClientId} parameter is not present"); + + if (!this.properties.TryGetValue(BigQueryParameters.ClientSecret, out clientSecret)) + throw new ArgumentException($"The {BigQueryParameters.ClientSecret} parameter is not present"); + + if (!this.properties.TryGetValue(BigQueryParameters.RefreshToken, out refreshToken)) + throw new ArgumentException($"The {BigQueryParameters.RefreshToken} parameter is not present"); + + this.credential = GoogleCredential.FromAccessToken(GetAccessToken(clientId, clientSecret, refreshToken, tokenEndpoint)); + } + else + { + string json = string.Empty; + + if (!this.properties.TryGetValue(BigQueryParameters.JsonCredential, out json)) + throw new ArgumentException($"The {BigQueryParameters.JsonCredential} parameter is not present"); + + this.credential = GoogleCredential.FromJson(json); + } + + this.client = BigQueryClient.Create(projectId, this.credential); + } + + public override IArrowArrayStream GetInfo(List<AdbcInfoCode> codes) + { + const int strValTypeID = 0; + + UnionType infoUnionType = new UnionType( + new List<Field>() + { + new Field("string_value", StringType.Default, true), + new Field("bool_value", BooleanType.Default, true), + new Field("int64_value", Int64Type.Default, true), + new Field("int32_bitmask", Int32Type.Default, true), + new Field( + "string_list", + new ListType( + new Field("item", StringType.Default, true) + ), + false + ), + new Field( + "int32_to_int32_list_map", + new ListType( + new Field("entries", new StructType( + new List<Field>() + { + new Field("key", Int32Type.Default, false), + new Field("value", Int32Type.Default, true), + } + ), false) + ), + true + ) + }, + new int[] { 0, 1, 2, 3, 4, 5 }.ToArray(), + UnionMode.Dense); + + if (codes.Count == 0) + { + codes = new List<AdbcInfoCode>(infoSupportedCodes); + } + + UInt32Array.Builder infoNameBuilder = new UInt32Array.Builder(); + ArrowBuffer.Builder<byte> typeBuilder = new ArrowBuffer.Builder<byte>(); + ArrowBuffer.Builder<int> offsetBuilder = new ArrowBuffer.Builder<int>(); + StringArray.Builder stringInfoBuilder = new StringArray.Builder(); + int nullCount = 0; + int arrayLength = codes.Count; + + foreach (AdbcInfoCode code in codes) + { + switch (code) + { + case AdbcInfoCode.DriverName: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoDriverName); + break; + case AdbcInfoCode.DriverVersion: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoDriverVersion); + break; + case AdbcInfoCode.DriverArrowVersion: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoDriverArrowVersion); + break; + case AdbcInfoCode.VendorName: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoVendorName); + break; + default: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.AppendNull(); + nullCount++; + break; + } + } + + StructType entryType = new StructType( + new List<Field>(){ + new Field("key", Int32Type.Default, false), + new Field("value", Int32Type.Default, true)}); + + StructArray entriesDataArray = new StructArray(entryType, 0, + new[] { new Int32Array.Builder().Build(), new Int32Array.Builder().Build() }, + new ArrowBuffer.BitmapBuilder().Build()); + + List<IArrowArray> childrenArrays = new List<IArrowArray>() + { + stringInfoBuilder.Build(), + new BooleanArray.Builder().Build(), + new Int64Array.Builder().Build(), + new Int32Array.Builder().Build(), + new ListArray.Builder(StringType.Default).Build(), + CreateNestedListArray(new List<IArrowArray?>(){ entriesDataArray }, entryType) + }; + + DenseUnionArray infoValue = new DenseUnionArray(infoUnionType, arrayLength, childrenArrays, typeBuilder.Build(), offsetBuilder.Build(), nullCount); + + List<IArrowArray> dataArrays = new List<IArrowArray> + { + infoNameBuilder.Build(), + infoValue + }; + + return new BigQueryInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays, 4); + } + + public override IArrowArrayStream GetObjects( + GetObjectsDepth depth, + string catalogPattern, + string dbSchemaPattern, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + List<IArrowArray> dataArrays = GetCatalogs(depth, catalogPattern, dbSchemaPattern, + tableNamePattern, tableTypes, columnNamePattern); + + return new BigQueryInfoArrowStream(StandardSchemas.GetObjectsSchema, dataArrays, 1); + } + + private List<IArrowArray> GetCatalogs( + GetObjectsDepth depth, + string catalogPattern, + string dbSchemaPattern, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + StringArray.Builder catalogNameBuilder = new StringArray.Builder(); + List<IArrowArray?> catalogDbSchemasValues = new List<IArrowArray?>(); + string catalogRegexp = PatternToRegEx(catalogPattern); + PagedEnumerable<ProjectList, CloudProject> catalogs = this.client.ListProjects(); + + foreach (CloudProject catalog in catalogs) + { + if (Regex.IsMatch(catalog.ProjectId, catalogRegexp, RegexOptions.IgnoreCase)) + { + catalogNameBuilder.Append(catalog.ProjectId); + + if (depth == GetObjectsDepth.Catalogs) + { + catalogDbSchemasValues.Add(null); + } + else + { + catalogDbSchemasValues.Add(GetDbSchemas( + depth, catalog.ProjectId, dbSchemaPattern, + tableNamePattern, tableTypes, columnNamePattern)); + } + } + } + + List<IArrowArray> dataArrays = new List<IArrowArray> + { + catalogNameBuilder.Build(), + CreateNestedListArray(catalogDbSchemasValues, new StructType(StandardSchemas.DbSchemaSchema)), + }; + + return dataArrays; + } + + private StructArray GetDbSchemas( + GetObjectsDepth depth, + string catalog, + string dbSchemaPattern, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + StringArray.Builder dbSchemaNameBuilder = new StringArray.Builder(); + List<IArrowArray?> dbSchemaTablesValues = new List<IArrowArray?>(); + ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder(); + int length = 0; + + string dbSchemaRegexp = PatternToRegEx(dbSchemaPattern); + + PagedEnumerable<DatasetList, BigQueryDataset> schemas = this.client.ListDatasets(catalog); + + foreach (BigQueryDataset schema in schemas) + { + if (Regex.IsMatch(schema.Reference.DatasetId, dbSchemaRegexp, RegexOptions.IgnoreCase)) + { + dbSchemaNameBuilder.Append(schema.Reference.DatasetId); + length++; + nullBitmapBuffer.Append(true); + + if (depth == GetObjectsDepth.DbSchemas) + { + dbSchemaTablesValues.Add(null); + } + else + { + dbSchemaTablesValues.Add(GetTableSchemas( + depth, catalog, schema.Reference.DatasetId, + tableNamePattern, tableTypes, columnNamePattern)); + } + } + } + + List<IArrowArray> dataArrays = new List<IArrowArray> + { + dbSchemaNameBuilder.Build(), + CreateNestedListArray(dbSchemaTablesValues, new StructType(StandardSchemas.TableSchema)), + }; + + return new StructArray( + new StructType(StandardSchemas.DbSchemaSchema), + length, + dataArrays, + nullBitmapBuffer.Build()); + } + + private StructArray GetTableSchemas( + GetObjectsDepth depth, + string catalog, + string dbSchema, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + StringArray.Builder tableNameBuilder = new StringArray.Builder(); + StringArray.Builder tableTypeBuilder = new StringArray.Builder(); + List<IArrowArray?> tableColumnsValues = new List<IArrowArray?>(); + List<IArrowArray?> tableConstraintsValues = new List<IArrowArray?>(); + ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder(); + int length = 0; + + string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.TABLES", + catalog, dbSchema); + + if (tableNamePattern != null) + { + query = string.Concat(query, string.Format(" WHERE table_name LIKE '{0}'", tableNamePattern)); + if (tableTypes.Count > 0) + { + query = string.Concat(query, string.Format(" AND table_type IN ('{0}')", string.Join("', '", tableTypes).ToUpper())); Review Comment: fixed in latest push ########## csharp/src/Drivers/BigQuery/BigQueryConnection.cs: ########## @@ -0,0 +1,954 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using System.Net.Http; +using System.Text; +using System.Text.Json; +using System.Text.RegularExpressions; +using Apache.Arrow.Ipc; +using Apache.Arrow.Types; +using Google.Api.Gax; +using Google.Apis.Auth.OAuth2; +using Google.Apis.Bigquery.v2.Data; +using Google.Cloud.BigQuery.V2; + +namespace Apache.Arrow.Adbc.Drivers.BigQuery +{ + /// <summary> + /// BigQuery-specific implementation of <see cref="AdbcConnection"/> + /// </summary> + public class BigQueryConnection : AdbcConnection + { + readonly IReadOnlyDictionary<string, string> properties; + BigQueryClient? client; + GoogleCredential? credential; + + const string infoDriverName = "ADBC BigQuery Driver"; + const string infoDriverVersion = "1.0.0"; + const string infoVendorName = "BigQuery"; + const string infoDriverArrowVersion = "1.0.0"; + + readonly IReadOnlyList<AdbcInfoCode> infoSupportedCodes = new List<AdbcInfoCode> { + AdbcInfoCode.DriverName, + AdbcInfoCode.DriverVersion, + AdbcInfoCode.DriverArrowVersion, + AdbcInfoCode.VendorName + }; + + public BigQueryConnection(IReadOnlyDictionary<string, string> properties) + { + this.properties = properties; + } + + /// <summary> + /// Initializes the internal BigQuery connection + /// </summary> + /// <exception cref="ArgumentException"></exception> + internal void Open() + { + string projectId = string.Empty; + string clientId = string.Empty; + string clientSecret = string.Empty; + string refreshToken = string.Empty; + + string tokenEndpoint = BigQueryConstants.TokenEndpoint; + + string authenticationType = BigQueryConstants.UserAuthenticationType; + + // TODO: handle token expiration + + if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId)) + throw new ArgumentException($"The {BigQueryParameters.ProjectId} parameter is not present"); + + if (this.properties.ContainsKey(BigQueryParameters.AuthenticationType)) + { + this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out authenticationType); + + if(!authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase) && + !authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType, StringComparison.OrdinalIgnoreCase)) + { + throw new ArgumentException($"The {BigQueryParameters.AuthenticationType} parameter can only be `{BigQueryConstants.UserAuthenticationType}` or `{BigQueryConstants.ServiceAccountAuthenticationType}`"); + } + } + + if (authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase)) + { + if (!this.properties.TryGetValue(BigQueryParameters.ClientId, out clientId)) + throw new ArgumentException($"The {BigQueryParameters.ClientId} parameter is not present"); + + if (!this.properties.TryGetValue(BigQueryParameters.ClientSecret, out clientSecret)) + throw new ArgumentException($"The {BigQueryParameters.ClientSecret} parameter is not present"); + + if (!this.properties.TryGetValue(BigQueryParameters.RefreshToken, out refreshToken)) + throw new ArgumentException($"The {BigQueryParameters.RefreshToken} parameter is not present"); + + this.credential = GoogleCredential.FromAccessToken(GetAccessToken(clientId, clientSecret, refreshToken, tokenEndpoint)); + } + else + { + string json = string.Empty; + + if (!this.properties.TryGetValue(BigQueryParameters.JsonCredential, out json)) + throw new ArgumentException($"The {BigQueryParameters.JsonCredential} parameter is not present"); + + this.credential = GoogleCredential.FromJson(json); + } + + this.client = BigQueryClient.Create(projectId, this.credential); + } + + public override IArrowArrayStream GetInfo(List<AdbcInfoCode> codes) + { + const int strValTypeID = 0; + + UnionType infoUnionType = new UnionType( + new List<Field>() + { + new Field("string_value", StringType.Default, true), + new Field("bool_value", BooleanType.Default, true), + new Field("int64_value", Int64Type.Default, true), + new Field("int32_bitmask", Int32Type.Default, true), + new Field( + "string_list", + new ListType( + new Field("item", StringType.Default, true) + ), + false + ), + new Field( + "int32_to_int32_list_map", + new ListType( + new Field("entries", new StructType( + new List<Field>() + { + new Field("key", Int32Type.Default, false), + new Field("value", Int32Type.Default, true), + } + ), false) + ), + true + ) + }, + new int[] { 0, 1, 2, 3, 4, 5 }.ToArray(), + UnionMode.Dense); + + if (codes.Count == 0) + { + codes = new List<AdbcInfoCode>(infoSupportedCodes); + } + + UInt32Array.Builder infoNameBuilder = new UInt32Array.Builder(); + ArrowBuffer.Builder<byte> typeBuilder = new ArrowBuffer.Builder<byte>(); + ArrowBuffer.Builder<int> offsetBuilder = new ArrowBuffer.Builder<int>(); + StringArray.Builder stringInfoBuilder = new StringArray.Builder(); + int nullCount = 0; + int arrayLength = codes.Count; + + foreach (AdbcInfoCode code in codes) + { + switch (code) + { + case AdbcInfoCode.DriverName: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoDriverName); + break; + case AdbcInfoCode.DriverVersion: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoDriverVersion); + break; + case AdbcInfoCode.DriverArrowVersion: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoDriverArrowVersion); + break; + case AdbcInfoCode.VendorName: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoVendorName); + break; + default: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.AppendNull(); + nullCount++; + break; + } + } + + StructType entryType = new StructType( + new List<Field>(){ + new Field("key", Int32Type.Default, false), + new Field("value", Int32Type.Default, true)}); + + StructArray entriesDataArray = new StructArray(entryType, 0, + new[] { new Int32Array.Builder().Build(), new Int32Array.Builder().Build() }, + new ArrowBuffer.BitmapBuilder().Build()); + + List<IArrowArray> childrenArrays = new List<IArrowArray>() + { + stringInfoBuilder.Build(), + new BooleanArray.Builder().Build(), + new Int64Array.Builder().Build(), + new Int32Array.Builder().Build(), + new ListArray.Builder(StringType.Default).Build(), + CreateNestedListArray(new List<IArrowArray?>(){ entriesDataArray }, entryType) + }; + + DenseUnionArray infoValue = new DenseUnionArray(infoUnionType, arrayLength, childrenArrays, typeBuilder.Build(), offsetBuilder.Build(), nullCount); + + List<IArrowArray> dataArrays = new List<IArrowArray> + { + infoNameBuilder.Build(), + infoValue + }; + + return new BigQueryInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays, 4); + } + + public override IArrowArrayStream GetObjects( + GetObjectsDepth depth, + string catalogPattern, + string dbSchemaPattern, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + List<IArrowArray> dataArrays = GetCatalogs(depth, catalogPattern, dbSchemaPattern, + tableNamePattern, tableTypes, columnNamePattern); + + return new BigQueryInfoArrowStream(StandardSchemas.GetObjectsSchema, dataArrays, 1); + } + + private List<IArrowArray> GetCatalogs( + GetObjectsDepth depth, + string catalogPattern, + string dbSchemaPattern, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + StringArray.Builder catalogNameBuilder = new StringArray.Builder(); + List<IArrowArray?> catalogDbSchemasValues = new List<IArrowArray?>(); + string catalogRegexp = PatternToRegEx(catalogPattern); + PagedEnumerable<ProjectList, CloudProject> catalogs = this.client.ListProjects(); + + foreach (CloudProject catalog in catalogs) + { + if (Regex.IsMatch(catalog.ProjectId, catalogRegexp, RegexOptions.IgnoreCase)) + { + catalogNameBuilder.Append(catalog.ProjectId); + + if (depth == GetObjectsDepth.Catalogs) + { + catalogDbSchemasValues.Add(null); + } + else + { + catalogDbSchemasValues.Add(GetDbSchemas( + depth, catalog.ProjectId, dbSchemaPattern, + tableNamePattern, tableTypes, columnNamePattern)); + } + } + } + + List<IArrowArray> dataArrays = new List<IArrowArray> + { + catalogNameBuilder.Build(), + CreateNestedListArray(catalogDbSchemasValues, new StructType(StandardSchemas.DbSchemaSchema)), + }; + + return dataArrays; + } + + private StructArray GetDbSchemas( + GetObjectsDepth depth, + string catalog, + string dbSchemaPattern, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + StringArray.Builder dbSchemaNameBuilder = new StringArray.Builder(); + List<IArrowArray?> dbSchemaTablesValues = new List<IArrowArray?>(); + ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder(); + int length = 0; + + string dbSchemaRegexp = PatternToRegEx(dbSchemaPattern); + + PagedEnumerable<DatasetList, BigQueryDataset> schemas = this.client.ListDatasets(catalog); + + foreach (BigQueryDataset schema in schemas) + { + if (Regex.IsMatch(schema.Reference.DatasetId, dbSchemaRegexp, RegexOptions.IgnoreCase)) + { + dbSchemaNameBuilder.Append(schema.Reference.DatasetId); + length++; + nullBitmapBuffer.Append(true); + + if (depth == GetObjectsDepth.DbSchemas) + { + dbSchemaTablesValues.Add(null); + } + else + { + dbSchemaTablesValues.Add(GetTableSchemas( + depth, catalog, schema.Reference.DatasetId, + tableNamePattern, tableTypes, columnNamePattern)); + } + } + } + + List<IArrowArray> dataArrays = new List<IArrowArray> + { + dbSchemaNameBuilder.Build(), + CreateNestedListArray(dbSchemaTablesValues, new StructType(StandardSchemas.TableSchema)), + }; + + return new StructArray( + new StructType(StandardSchemas.DbSchemaSchema), + length, + dataArrays, + nullBitmapBuffer.Build()); + } + + private StructArray GetTableSchemas( + GetObjectsDepth depth, + string catalog, + string dbSchema, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + StringArray.Builder tableNameBuilder = new StringArray.Builder(); + StringArray.Builder tableTypeBuilder = new StringArray.Builder(); + List<IArrowArray?> tableColumnsValues = new List<IArrowArray?>(); + List<IArrowArray?> tableConstraintsValues = new List<IArrowArray?>(); + ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder(); + int length = 0; + + string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.TABLES", + catalog, dbSchema); + + if (tableNamePattern != null) + { + query = string.Concat(query, string.Format(" WHERE table_name LIKE '{0}'", tableNamePattern)); Review Comment: fixed in latest push ########## csharp/src/Drivers/BigQuery/BigQueryConnection.cs: ########## @@ -0,0 +1,954 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using System.Net.Http; +using System.Text; +using System.Text.Json; +using System.Text.RegularExpressions; +using Apache.Arrow.Ipc; +using Apache.Arrow.Types; +using Google.Api.Gax; +using Google.Apis.Auth.OAuth2; +using Google.Apis.Bigquery.v2.Data; +using Google.Cloud.BigQuery.V2; + +namespace Apache.Arrow.Adbc.Drivers.BigQuery +{ + /// <summary> + /// BigQuery-specific implementation of <see cref="AdbcConnection"/> + /// </summary> + public class BigQueryConnection : AdbcConnection + { + readonly IReadOnlyDictionary<string, string> properties; + BigQueryClient? client; + GoogleCredential? credential; + + const string infoDriverName = "ADBC BigQuery Driver"; + const string infoDriverVersion = "1.0.0"; + const string infoVendorName = "BigQuery"; + const string infoDriverArrowVersion = "1.0.0"; + + readonly IReadOnlyList<AdbcInfoCode> infoSupportedCodes = new List<AdbcInfoCode> { + AdbcInfoCode.DriverName, + AdbcInfoCode.DriverVersion, + AdbcInfoCode.DriverArrowVersion, + AdbcInfoCode.VendorName + }; + + public BigQueryConnection(IReadOnlyDictionary<string, string> properties) + { + this.properties = properties; + } + + /// <summary> + /// Initializes the internal BigQuery connection + /// </summary> + /// <exception cref="ArgumentException"></exception> + internal void Open() + { + string projectId = string.Empty; + string clientId = string.Empty; + string clientSecret = string.Empty; + string refreshToken = string.Empty; + + string tokenEndpoint = BigQueryConstants.TokenEndpoint; + + string authenticationType = BigQueryConstants.UserAuthenticationType; + + // TODO: handle token expiration + + if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId)) + throw new ArgumentException($"The {BigQueryParameters.ProjectId} parameter is not present"); + + if (this.properties.ContainsKey(BigQueryParameters.AuthenticationType)) + { + this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out authenticationType); + + if(!authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase) && + !authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType, StringComparison.OrdinalIgnoreCase)) + { + throw new ArgumentException($"The {BigQueryParameters.AuthenticationType} parameter can only be `{BigQueryConstants.UserAuthenticationType}` or `{BigQueryConstants.ServiceAccountAuthenticationType}`"); + } + } + + if (authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase)) + { + if (!this.properties.TryGetValue(BigQueryParameters.ClientId, out clientId)) + throw new ArgumentException($"The {BigQueryParameters.ClientId} parameter is not present"); + + if (!this.properties.TryGetValue(BigQueryParameters.ClientSecret, out clientSecret)) + throw new ArgumentException($"The {BigQueryParameters.ClientSecret} parameter is not present"); + + if (!this.properties.TryGetValue(BigQueryParameters.RefreshToken, out refreshToken)) + throw new ArgumentException($"The {BigQueryParameters.RefreshToken} parameter is not present"); + + this.credential = GoogleCredential.FromAccessToken(GetAccessToken(clientId, clientSecret, refreshToken, tokenEndpoint)); + } + else + { + string json = string.Empty; + + if (!this.properties.TryGetValue(BigQueryParameters.JsonCredential, out json)) + throw new ArgumentException($"The {BigQueryParameters.JsonCredential} parameter is not present"); + + this.credential = GoogleCredential.FromJson(json); + } + + this.client = BigQueryClient.Create(projectId, this.credential); + } + + public override IArrowArrayStream GetInfo(List<AdbcInfoCode> codes) + { + const int strValTypeID = 0; + + UnionType infoUnionType = new UnionType( + new List<Field>() + { + new Field("string_value", StringType.Default, true), + new Field("bool_value", BooleanType.Default, true), + new Field("int64_value", Int64Type.Default, true), + new Field("int32_bitmask", Int32Type.Default, true), + new Field( + "string_list", + new ListType( + new Field("item", StringType.Default, true) + ), + false + ), + new Field( + "int32_to_int32_list_map", + new ListType( + new Field("entries", new StructType( + new List<Field>() + { + new Field("key", Int32Type.Default, false), + new Field("value", Int32Type.Default, true), + } + ), false) + ), + true + ) + }, + new int[] { 0, 1, 2, 3, 4, 5 }.ToArray(), + UnionMode.Dense); + + if (codes.Count == 0) + { + codes = new List<AdbcInfoCode>(infoSupportedCodes); + } + + UInt32Array.Builder infoNameBuilder = new UInt32Array.Builder(); + ArrowBuffer.Builder<byte> typeBuilder = new ArrowBuffer.Builder<byte>(); + ArrowBuffer.Builder<int> offsetBuilder = new ArrowBuffer.Builder<int>(); + StringArray.Builder stringInfoBuilder = new StringArray.Builder(); + int nullCount = 0; + int arrayLength = codes.Count; + + foreach (AdbcInfoCode code in codes) + { + switch (code) + { + case AdbcInfoCode.DriverName: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoDriverName); + break; + case AdbcInfoCode.DriverVersion: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoDriverVersion); + break; + case AdbcInfoCode.DriverArrowVersion: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoDriverArrowVersion); + break; + case AdbcInfoCode.VendorName: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.Append(infoVendorName); + break; + default: + infoNameBuilder.Append((UInt32)code); + typeBuilder.Append(strValTypeID); + offsetBuilder.Append(stringInfoBuilder.Length); + stringInfoBuilder.AppendNull(); + nullCount++; + break; + } + } + + StructType entryType = new StructType( + new List<Field>(){ + new Field("key", Int32Type.Default, false), + new Field("value", Int32Type.Default, true)}); + + StructArray entriesDataArray = new StructArray(entryType, 0, + new[] { new Int32Array.Builder().Build(), new Int32Array.Builder().Build() }, + new ArrowBuffer.BitmapBuilder().Build()); + + List<IArrowArray> childrenArrays = new List<IArrowArray>() + { + stringInfoBuilder.Build(), + new BooleanArray.Builder().Build(), + new Int64Array.Builder().Build(), + new Int32Array.Builder().Build(), + new ListArray.Builder(StringType.Default).Build(), + CreateNestedListArray(new List<IArrowArray?>(){ entriesDataArray }, entryType) + }; + + DenseUnionArray infoValue = new DenseUnionArray(infoUnionType, arrayLength, childrenArrays, typeBuilder.Build(), offsetBuilder.Build(), nullCount); + + List<IArrowArray> dataArrays = new List<IArrowArray> + { + infoNameBuilder.Build(), + infoValue + }; + + return new BigQueryInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays, 4); + } + + public override IArrowArrayStream GetObjects( + GetObjectsDepth depth, + string catalogPattern, + string dbSchemaPattern, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + List<IArrowArray> dataArrays = GetCatalogs(depth, catalogPattern, dbSchemaPattern, + tableNamePattern, tableTypes, columnNamePattern); + + return new BigQueryInfoArrowStream(StandardSchemas.GetObjectsSchema, dataArrays, 1); + } + + private List<IArrowArray> GetCatalogs( + GetObjectsDepth depth, + string catalogPattern, + string dbSchemaPattern, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + StringArray.Builder catalogNameBuilder = new StringArray.Builder(); + List<IArrowArray?> catalogDbSchemasValues = new List<IArrowArray?>(); + string catalogRegexp = PatternToRegEx(catalogPattern); + PagedEnumerable<ProjectList, CloudProject> catalogs = this.client.ListProjects(); + + foreach (CloudProject catalog in catalogs) + { + if (Regex.IsMatch(catalog.ProjectId, catalogRegexp, RegexOptions.IgnoreCase)) + { + catalogNameBuilder.Append(catalog.ProjectId); + + if (depth == GetObjectsDepth.Catalogs) + { + catalogDbSchemasValues.Add(null); + } + else + { + catalogDbSchemasValues.Add(GetDbSchemas( + depth, catalog.ProjectId, dbSchemaPattern, + tableNamePattern, tableTypes, columnNamePattern)); + } + } + } + + List<IArrowArray> dataArrays = new List<IArrowArray> + { + catalogNameBuilder.Build(), + CreateNestedListArray(catalogDbSchemasValues, new StructType(StandardSchemas.DbSchemaSchema)), + }; + + return dataArrays; + } + + private StructArray GetDbSchemas( + GetObjectsDepth depth, + string catalog, + string dbSchemaPattern, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + StringArray.Builder dbSchemaNameBuilder = new StringArray.Builder(); + List<IArrowArray?> dbSchemaTablesValues = new List<IArrowArray?>(); + ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder(); + int length = 0; + + string dbSchemaRegexp = PatternToRegEx(dbSchemaPattern); + + PagedEnumerable<DatasetList, BigQueryDataset> schemas = this.client.ListDatasets(catalog); + + foreach (BigQueryDataset schema in schemas) + { + if (Regex.IsMatch(schema.Reference.DatasetId, dbSchemaRegexp, RegexOptions.IgnoreCase)) + { + dbSchemaNameBuilder.Append(schema.Reference.DatasetId); + length++; + nullBitmapBuffer.Append(true); + + if (depth == GetObjectsDepth.DbSchemas) + { + dbSchemaTablesValues.Add(null); + } + else + { + dbSchemaTablesValues.Add(GetTableSchemas( + depth, catalog, schema.Reference.DatasetId, + tableNamePattern, tableTypes, columnNamePattern)); + } + } + } + + List<IArrowArray> dataArrays = new List<IArrowArray> + { + dbSchemaNameBuilder.Build(), + CreateNestedListArray(dbSchemaTablesValues, new StructType(StandardSchemas.TableSchema)), + }; + + return new StructArray( + new StructType(StandardSchemas.DbSchemaSchema), + length, + dataArrays, + nullBitmapBuffer.Build()); + } + + private StructArray GetTableSchemas( + GetObjectsDepth depth, + string catalog, + string dbSchema, + string tableNamePattern, + List<string> tableTypes, + string columnNamePattern) + { + StringArray.Builder tableNameBuilder = new StringArray.Builder(); + StringArray.Builder tableTypeBuilder = new StringArray.Builder(); + List<IArrowArray?> tableColumnsValues = new List<IArrowArray?>(); + List<IArrowArray?> tableConstraintsValues = new List<IArrowArray?>(); + ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder(); + int length = 0; + + string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.TABLES", + catalog, dbSchema); + + if (tableNamePattern != null) + { + query = string.Concat(query, string.Format(" WHERE table_name LIKE '{0}'", tableNamePattern)); + if (tableTypes.Count > 0) + { + query = string.Concat(query, string.Format(" AND table_type IN ('{0}')", string.Join("', '", tableTypes).ToUpper())); + } + } + else + { + if (tableTypes.Count > 0) + { + query = string.Concat(query, string.Format(" WHERE table_type IN ('{0}')", string.Join("', '", tableTypes).ToUpper())); + } + } + + BigQueryResults result = this.client.ExecuteQuery(query, parameters: null); + + foreach (BigQueryRow row in result) + { + tableNameBuilder.Append(row["table_name"].ToString()); + tableTypeBuilder.Append(row["table_type"].ToString()); + nullBitmapBuffer.Append(true); + length++; + + tableConstraintsValues.Add(GetConstraintSchema( + depth, catalog, dbSchema, row["table_name"].ToString(), columnNamePattern)); + + // TODO: add constraints + if (depth == GetObjectsDepth.Tables) + { + tableColumnsValues.Add(null); + } + else + { + tableColumnsValues.Add(GetColumnSchema(catalog, dbSchema, row["table_name"].ToString(), columnNamePattern)); + } + } + + List<IArrowArray> dataArrays = new List<IArrowArray> + { + tableNameBuilder.Build(), + tableTypeBuilder.Build(), + CreateNestedListArray(tableColumnsValues, new StructType(StandardSchemas.ColumnSchema)), + CreateNestedListArray(tableConstraintsValues, new StructType(StandardSchemas.ConstraintSchema)) + }; + + return new StructArray( + new StructType(StandardSchemas.TableSchema), + length, + dataArrays, + nullBitmapBuffer.Build()); + } + + private StructArray GetColumnSchema( + string catalog, + string dbSchema, + string table, + string columnNamePattern) + { + StringArray.Builder columnNameBuilder = new StringArray.Builder(); + Int32Array.Builder ordinalPositionBuilder = new Int32Array.Builder(); + StringArray.Builder remarksBuilder = new StringArray.Builder(); + Int16Array.Builder xdbcDataTypeBuilder = new Int16Array.Builder(); + StringArray.Builder xdbcTypeNameBuilder = new StringArray.Builder(); + Int32Array.Builder xdbcColumnSizeBuilder = new Int32Array.Builder(); + Int16Array.Builder xdbcDecimalDigitsBuilder = new Int16Array.Builder(); + Int16Array.Builder xdbcNumPrecRadixBuilder = new Int16Array.Builder(); + Int16Array.Builder xdbcNullableBuilder = new Int16Array.Builder(); + StringArray.Builder xdbcColumnDefBuilder = new StringArray.Builder(); + Int16Array.Builder xdbcSqlDataTypeBuilder = new Int16Array.Builder(); + Int16Array.Builder xdbcDatetimeSubBuilder = new Int16Array.Builder(); + Int32Array.Builder xdbcCharOctetLengthBuilder = new Int32Array.Builder(); + StringArray.Builder xdbcIsNullableBuilder = new StringArray.Builder(); + StringArray.Builder xdbcScopeCatalogBuilder = new StringArray.Builder(); + StringArray.Builder xdbcScopeSchemaBuilder = new StringArray.Builder(); + StringArray.Builder xdbcScopeTableBuilder = new StringArray.Builder(); + BooleanArray.Builder xdbcIsAutoincrementBuilder = new BooleanArray.Builder(); + BooleanArray.Builder xdbcIsGeneratedcolumnBuilder = new BooleanArray.Builder(); + ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder(); + int length = 0; + + string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'", + catalog, dbSchema, table); + + if (columnNamePattern != null) + { + query = string.Concat(query, string.Format("AND column_name LIKE '{0}'", columnNamePattern)); Review Comment: fixed in latest push -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
