This is an automated email from the ASF dual-hosted git repository. yuqi4733 pushed a commit to branch internal-main in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 8618ece17a9f8f55fa2191fcf19d4297a6a9e9c4 Author: geyanggang <[email protected]> AuthorDate: Fri Feb 6 16:56:08 2026 +0800 [#92]feat(bigquery-catalog): Add Schema operators for BigQuery catalog. --- catalogs/catalog-jdbc-bigquery/build.gradle.kts | 109 ++- .../catalog/bigquery/BigQueryCatalog.java | 125 +++- .../BigQueryCatalogPropertiesMetadata.java | 17 +- .../catalog/bigquery/BigQueryClientPool.java | 171 +++++ .../bigquery/BigQuerySchemaPropertiesMetadata.java | 235 +++++- .../operation/BigQueryDatabaseOperations.java | 784 ++++++++++++++++++++- .../bigquery/operation/BigQueryOperationUtils.java | 100 +++ .../operation/BigQueryTableOperations.java | 21 + .../src/main/resources/jdbc-bigquery.conf | 12 +- .../catalog/bigquery/TestBigQueryCatalog.java | 103 +++ .../catalog/bigquery/TestBigQueryClientPool.java | 87 +++ .../TestBigQuerySchemaPropertiesMetadata.java | 141 ++++ .../operation/TestBigQueryDatabaseOperations.java | 136 ++++ gradle/libs.versions.toml | 3 + 14 files changed, 2016 insertions(+), 28 deletions(-) diff --git a/catalogs/catalog-jdbc-bigquery/build.gradle.kts b/catalogs/catalog-jdbc-bigquery/build.gradle.kts index 7b04ae9330..efdaae56cd 100644 --- a/catalogs/catalog-jdbc-bigquery/build.gradle.kts +++ b/catalogs/catalog-jdbc-bigquery/build.gradle.kts @@ -22,12 +22,79 @@ plugins { `maven-publish` id("java") id("idea") + id("de.undercouch.download") version "5.4.0" +} + +val simbaDriverVersion = libs.versions.simba.bigquery.jdbc.get() +val simbaDriverUrl = "https://storage.googleapis.com/simba-bq-release/jdbc/SimbaJDBCDriverforGoogleBigQuery42_$simbaDriverVersion.zip" + +val simbaDownloadDir = layout.buildDirectory.dir("downloads").get() +val simbaExtractDir = layout.buildDirectory.dir("extracted/simba-driver").get() + +val simbaZipFile = simbaDownloadDir.file("simba-driver-$simbaDriverVersion.zip").asFile + +val downloadSimbaDriver by tasks.registering(de.undercouch.gradle.tasks.download.Download::class) { + src(simbaDriverUrl) + dest(simbaZipFile) + overwrite(false) + onlyIfModified(true) + + doFirst { + logger.lifecycle("Downloading Simba BigQuery JDBC driver: v$simbaDriverVersion...") + } + + doLast { + val fileSize = simbaZipFile.length() / 1024 / 1024 + logger.lifecycle("Download completed. File size: ${fileSize}MB") + } +} + +val extractSimbaDriver by tasks.registering(Copy::class) { + dependsOn(downloadSimbaDriver) + + from(zipTree(simbaZipFile)) + into(simbaExtractDir) + include("**/*.jar") + exclude("**/src/", "**/doc/", "**/samples/", "**/legal/") + + rename("GoogleBigQueryJDBC42.jar", "GoogleBigQueryJDBC42-simba.jar") + + doFirst { + logger.lifecycle("Decompressing Simba JDBC driver...") + } + + doLast { + val jarFiles = fileTree(simbaExtractDir).matching { include("*.jar") }.files + logger.lifecycle("Found ${jarFiles.size} JAR files:") + jarFiles.forEach { jar -> + logger.lifecycle(" - ${jar.name} (${jar.length() / 1024}KB)") + } + } +} + +val cleanSimbaDriver by tasks.registering(Delete::class) { + delete(simbaDownloadDir, simbaExtractDir) +} + +tasks.clean { + dependsOn(cleanSimbaDriver) +} + +tasks.withType<JavaCompile> { + dependsOn(extractSimbaDriver) +} + +tasks.test { + dependsOn(extractSimbaDriver) } dependencies { implementation(project(":api")) { exclude(group = "*") } + implementation(project(":catalogs:catalog-common")) { + exclude(group = "*") + } implementation(project(":catalogs:catalog-jdbc-common")) { exclude(group = "*") } @@ -39,8 +106,10 @@ dependencies { } implementation(libs.bundles.log4j) + implementation(libs.commons.collections4) implementation(libs.commons.lang3) implementation(libs.guava) + implementation(libs.commons.compress) testImplementation(project(":catalogs:catalog-jdbc-common", "testArtifacts")) testImplementation(project(":clients:client-java")) @@ -50,34 +119,64 @@ dependencies { testImplementation(libs.junit.jupiter.api) testImplementation(libs.junit.jupiter.params) + testImplementation(libs.testcontainers) + testImplementation(libs.mockito.core) + + val simbaJdbcDriver = files(simbaExtractDir.asFileTree.matching { include("*.jar") }) + implementation(simbaJdbcDriver) testRuntimeOnly(libs.junit.jupiter.engine) } tasks { val runtimeJars by registering(Copy::class) { + dependsOn("jar", extractSimbaDriver) from(configurations.runtimeClasspath) into("build/libs") + duplicatesStrategy = DuplicatesStrategy.EXCLUDE + + doFirst { + logger.lifecycle("Copying runtime dependencies to build/libs") + logger.lifecycle("Including Simba JDBC driver files:") + fileTree(simbaExtractDir).matching { include("*.jar") }.forEach { jar -> + logger.lifecycle(" - ${jar.name}") + } + } } val copyCatalogLibs by registering(Copy::class) { - dependsOn("jar", "runtimeJars") + dependsOn("jar", runtimeJars) from("build/libs") { exclude("guava-*.jar") exclude("log4j-*.jar") exclude("slf4j-*.jar") + exclude("grpc-google-cloud-bigquerystorage-v1beta*.jar") + exclude("proto-google-cloud-bigquerystorage-v1beta*.jar") + exclude("error_prone_annotations-2.33.0.jar") + exclude("failureaccess-1.0.2.jar") } into("$rootDir/distribution/package/catalogs/jdbc-bigquery/libs") + + doLast { + logger.lifecycle("The BigQuery JDBC Catalog library file has been copied to the distribution directory.") + } } val copyCatalogConfig by registering(Copy::class) { from("src/main/resources") into("$rootDir/distribution/package/catalogs/jdbc-bigquery/conf") + include("jdbc-bigquery.conf") + exclude { details -> details.file.isDirectory() } + fileMode = 0b111101101 + + doLast { + logger.lifecycle("BigQuery JDBC configuration file copied.") + } } register("copyLibAndConfig", Copy::class) { @@ -93,8 +192,12 @@ tasks.test { } else { dependsOn(tasks.jar) } + dependsOn(extractSimbaDriver) + environment("SIMBA_JDBC_DRIVER_PATH", simbaExtractDir.asFile.absolutePath) } -tasks.getByName("generateMetadataFileForMavenJavaPublication") { - dependsOn("runtimeJars") +afterEvaluate { + tasks.getByName("generateMetadataFileForMavenJavaPublication") { + dependsOn(tasks.getByName("runtimeJars")) + } } diff --git a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalog.java b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalog.java index bb47ddc209..85dee82a15 100644 --- a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalog.java +++ b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalog.java @@ -18,12 +18,16 @@ */ package org.apache.gravitino.catalog.bigquery; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.catalog.bigquery.converter.BigQueryColumnDefaultValueConverter; import org.apache.gravitino.catalog.bigquery.converter.BigQueryExceptionConverter; import org.apache.gravitino.catalog.bigquery.converter.BigQueryTypeConverter; import org.apache.gravitino.catalog.bigquery.operation.BigQueryDatabaseOperations; import org.apache.gravitino.catalog.bigquery.operation.BigQueryTableOperations; import org.apache.gravitino.catalog.jdbc.JdbcCatalog; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; import org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter; import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter; @@ -31,18 +35,131 @@ import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations; import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations; import org.apache.gravitino.connector.PropertiesMetadata; import org.apache.gravitino.connector.capability.Capability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Implementation of a BigQuery catalog in Apache Gravitino. */ public class BigQueryCatalog extends JdbcCatalog { + private static final Logger LOG = LoggerFactory.getLogger(BigQueryCatalog.class); + public static final BigQueryTablePropertiesMetadata BIGQUERY_TABLE_PROPERTIES_META = new BigQueryTablePropertiesMetadata(); + private BigQueryClientPool clientPool; + + @Override + public BigQueryCatalog withCatalogConf(Map<String, String> conf) { + // Build JDBC URL from friendly properties before setting config + Map<String, String> processedConfig = buildJdbcUrl(conf); + + // Initialize BigQuery API client pool for cross-region dataset listing + this.clientPool = new BigQueryClientPool(processedConfig); + + // Call parent with processed config + return (BigQueryCatalog) super.withCatalogConf(processedConfig); + } + + /** + * Gets the BigQuery API client pool. + * + * <p>This method provides access to the BigQuery API client pool for operations that need to + * interact with BigQuery APIs directly, such as cross-region dataset operations or advanced + * metadata retrieval that cannot be performed through JDBC. + * + * @return BigQuery client pool, may be null if not initialized + */ + public BigQueryClientPool getClientPool() { + return clientPool; + } + + @Override + public void close() throws java.io.IOException { + if (clientPool != null) { + try { + clientPool.close(); + } catch (Exception e) { + LOG.warn("Error closing BigQuery client pool", e); + } + } + super.close(); + } + @Override public String shortName() { return "jdbc-bigquery"; } + /** + * Build JDBC URL from individual BigQuery configuration components. If jdbc-url contains + * authentication parameters, use it directly. Otherwise, enhance the base URL with project-id, + * jdbc-user (service account email), and jdbc-password (key file path). + */ + private Map<String, String> buildJdbcUrl(Map<String, String> config) { + + String jdbcUrl = config.get(JdbcConfig.JDBC_URL.getKey()); + + // If jdbc-url already contains authentication parameters, use it as-is + if (StringUtils.isNotBlank(jdbcUrl) + && jdbcUrl.contains("ProjectId=") + && (jdbcUrl.contains("OAuthServiceAcctEmail=") || jdbcUrl.contains("OAuthType="))) { + LOG.info("Using existing JDBC URL with authentication parameters"); + return config; + } + + // Extract BigQuery-specific properties + String projectId = config.get(BigQueryCatalogPropertiesMetadata.PROJECT_ID); + String keyFilePath = config.get(JdbcConfig.PASSWORD.getKey()); // Key file path + String serviceAccountEmail = config.get(JdbcConfig.USERNAME.getKey()); // Service account email + + // Validate required properties + if (StringUtils.isBlank(projectId)) { + throw new IllegalArgumentException("project-id is required for BigQuery catalog"); + } + if (StringUtils.isBlank(keyFilePath)) { + throw new IllegalArgumentException( + "jdbc-password (key file path) is required for BigQuery catalog"); + } + + // Build complete JDBC URL with authentication parameters + // Use OAuthType=0 for service account authentication + StringBuilder urlBuilder = new StringBuilder(); + + // Start with base URL or construct default URL + if (StringUtils.isNotBlank(jdbcUrl)) { + // Ensure URL ends with semicolon for parameter separation + String baseUrl = jdbcUrl.endsWith(";") ? jdbcUrl : jdbcUrl + ";"; + urlBuilder.append(baseUrl); + } else { + // Use default BigQuery JDBC URL + urlBuilder.append("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;"); + } + + urlBuilder.append("ProjectId=").append(projectId).append(";"); + urlBuilder.append("OAuthType=0;"); + + // Add service account email if provided + if (StringUtils.isNotBlank(serviceAccountEmail)) { + urlBuilder.append("OAuthServiceAcctEmail=").append(serviceAccountEmail).append(";"); + } + + urlBuilder.append("OAuthPvtKeyPath=").append(keyFilePath).append(";"); + + String completeJdbcUrl = urlBuilder.toString(); + + // Create new config map with complete jdbc-url + HashMap<String, String> newConfig = new HashMap<>(config); + newConfig.put(JdbcConfig.JDBC_URL.getKey(), completeJdbcUrl); + + // Log the constructed URL (without sensitive info) + LOG.info("Built BigQuery JDBC URL for project: {}", projectId); + LOG.debug( + "Complete JDBC URL: {}", + completeJdbcUrl.replaceAll("OAuthPvtKeyPath=[^;]+", "OAuthPvtKeyPath=[REDACTED]")); + + return newConfig; + } + @Override public Capability newCapability() { return new BigQueryCatalogCapability(); @@ -75,12 +192,16 @@ public class BigQueryCatalog extends JdbcCatalog { @Override protected JdbcDatabaseOperations createJdbcDatabaseOperations() { - return new BigQueryDatabaseOperations(); + BigQueryDatabaseOperations operations = new BigQueryDatabaseOperations(); + operations.setClientPool(this.clientPool); + return operations; } @Override protected JdbcTableOperations createJdbcTableOperations() { - return new BigQueryTableOperations(); + BigQueryTableOperations operations = new BigQueryTableOperations(); + operations.setClientPool(this.clientPool); + return operations; } @Override diff --git a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalogPropertiesMetadata.java b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalogPropertiesMetadata.java index c1df1b8b42..8d356159be 100644 --- a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalogPropertiesMetadata.java +++ b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalogPropertiesMetadata.java @@ -23,18 +23,23 @@ import java.util.Map; import org.apache.gravitino.catalog.jdbc.JdbcCatalogPropertiesMetadata; import org.apache.gravitino.connector.PropertyEntry; -/** Properties metadata for BigQuery catalog. */ +/** + * Properties metadata for BigQuery catalog. + * + * <p>Defines BigQuery-specific catalog properties including project ID and authentication + * credentials. + */ public class BigQueryCatalogPropertiesMetadata extends JdbcCatalogPropertiesMetadata { + // BigQuery catalog property keys + public static final String PROJECT_ID = "project-id"; + private static final Map<String, PropertyEntry<?>> BIGQUERY_CATALOG_PROPERTY_ENTRIES = ImmutableMap.<String, PropertyEntry<?>>builder() .put( - "project-id", + PROJECT_ID, PropertyEntry.stringRequiredPropertyEntry( - "project-id", - "Google Cloud Project ID", - false /* immutable */, - false /* hidden */)) + PROJECT_ID, "Google Cloud Project ID", false /* immutable */, false /* hidden */)) .build(); @Override diff --git a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryClientPool.java b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryClientPool.java new file mode 100644 index 0000000000..05683bfb5e --- /dev/null +++ b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryClientPool.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.bigquery; + +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.gson.GsonFactory; +import com.google.api.services.bigquery.Bigquery; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BigQuery client pool for managing BigQuery API clients. + * + * <p>This class provides a centralized way to create and manage BigQuery API clients with proper + * authentication using service account credentials. It uses the Google API Client Library for Java + * (google-api-services-bigquery) which is already included in the Simba JDBC driver dependencies. + */ +public class BigQueryClientPool { + + private static final Logger LOG = LoggerFactory.getLogger(BigQueryClientPool.class); + private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); + private static final String APPLICATION_NAME = "Apache-Gravitino-BigQuery-Catalog"; + + private final String projectId; + private final String keyFilePath; + private volatile Bigquery bigQueryClient; + + /** + * Creates a new BigQuery client pool. + * + * @param config catalog configuration containing project-id and jdbc-password (key file path) + */ + public BigQueryClientPool(Map<String, String> config) { + this.projectId = config.get(BigQueryCatalogPropertiesMetadata.PROJECT_ID); + this.keyFilePath = config.get(JdbcConfig.PASSWORD.getKey()); // Key file path + + if (StringUtils.isBlank(projectId)) { + throw new IllegalArgumentException("project-id is required for BigQuery catalog"); + } + if (StringUtils.isBlank(keyFilePath)) { + throw new IllegalArgumentException( + "jdbc-password (key file path) is required for BigQuery catalog"); + } + + LOG.info("Initialized BigQuery client pool for project: {}", projectId); + } + + /** + * Gets or creates a BigQuery API client. + * + * <p>This method uses double-checked locking to ensure thread-safe lazy initialization of the + * BigQuery client. + * + * @return BigQuery API client + * @throws RuntimeException if client creation fails + */ + public Bigquery getClient() { + if (bigQueryClient == null) { + synchronized (this) { + if (bigQueryClient == null) { + bigQueryClient = createClient(); + } + } + } + return bigQueryClient; + } + + /** + * Creates a new BigQuery API client with service account authentication. + * + * @return BigQuery API client + * @throws RuntimeException if client creation fails + */ + private Bigquery createClient() { + try { + LOG.debug("Creating BigQuery API client for project: {}", projectId); + + // Load service account credentials from key file + GoogleCredentials credentials; + try (FileInputStream serviceAccountStream = new FileInputStream(keyFilePath)) { + credentials = ServiceAccountCredentials.fromStream(serviceAccountStream); + } + + // Create HTTP transport + HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); + + // Build BigQuery client with credentials + Bigquery client = + new Bigquery.Builder(httpTransport, JSON_FACTORY, new HttpCredentialsAdapter(credentials)) + .setApplicationName(APPLICATION_NAME) + .build(); + + LOG.info("Successfully created BigQuery API client for project: {}", projectId); + return client; + + } catch (IOException e) { + String errorMsg = + String.format( + "Failed to create BigQuery API client for project '%s'. " + + "Please verify the service account key file path: %s", + projectId, keyFilePath); + LOG.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } catch (GeneralSecurityException e) { + String errorMsg = + String.format("Security error creating BigQuery API client for project '%s'", projectId); + LOG.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } catch (Exception e) { + String errorMsg = + String.format( + "Unexpected error creating BigQuery API client for project '%s'", projectId); + LOG.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } + } + + /** + * Closes the BigQuery client and releases resources. + * + * <p>This method should be called when the catalog is being shut down. + */ + public void close() { + if (bigQueryClient != null) { + try { + // BigQuery client doesn't have an explicit close method + // Resources are managed by the underlying HTTP transport + bigQueryClient = null; + LOG.info("Closed BigQuery API client for project: {}", projectId); + } catch (Exception e) { + LOG.warn("Error closing BigQuery API client", e); + } + } + } + + /** + * Gets the project ID. + * + * @return project ID + */ + public String getProjectId() { + return projectId; + } +} diff --git a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQuerySchemaPropertiesMetadata.java b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQuerySchemaPropertiesMetadata.java index 291c195d79..cb1b88006d 100644 --- a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQuerySchemaPropertiesMetadata.java +++ b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQuerySchemaPropertiesMetadata.java @@ -18,16 +18,245 @@ */ package org.apache.gravitino.catalog.bigquery; -import com.google.common.collect.ImmutableMap; +import static org.apache.gravitino.connector.PropertyEntry.stringImmutablePropertyEntry; +import static org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry; + +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.apache.gravitino.connector.BasePropertiesMetadata; import org.apache.gravitino.connector.PropertyEntry; -/** BigQuery schema (dataset) properties metadata. */ +/** + * BigQuery schema (dataset) properties metadata. + * + * <p>Defines BigQuery-specific dataset properties such as location, default table expiration, case + * sensitivity, and labels. Some properties are immutable after dataset creation (location, + * default_collation) while others can be modified (default_table_expiration_days, + * is_case_insensitive, description, labels). + */ public class BigQuerySchemaPropertiesMetadata extends BasePropertiesMetadata { + // BigQuery Dataset specific property keys - Supported properties + public static final String LOCATION = "location"; + public static final String DEFAULT_TABLE_EXPIRATION_DAYS = "default_table_expiration_days"; + public static final String IS_CASE_INSENSITIVE = "is_case_insensitive"; + public static final String DESCRIPTION = "description"; + public static final String LABELS = "labels"; + public static final String DEFAULT_COLLATION = "default_collation"; + public static final String DEFAULT_PARTITION_EXPIRATION_DAYS = + "default_partition_expiration_days"; + public static final String STORAGE_BILLING_MODEL = "storage_billing_model"; + public static final String MAX_TIME_TRAVEL_HOURS = "max_time_travel_hours"; + public static final String DEFAULT_KMS_KEY_NAME = "default_kms_key_name"; + public static final String DEFAULT_ROUNDING_MODE = "default_rounding_mode"; + public static final String FRIENDLY_NAME = "friendly_name"; + public static final String FAILOVER_RESERVATION = "failover_reservation"; + public static final String IS_PRIMARY = "is_primary"; + public static final String PRIMARY_REPLICA = "primary_replica"; + public static final String TAGS = "tags"; + + private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA = + createPropertiesMetadata(); + + /** + * Creates the properties metadata map for BigQuery datasets. + * + * @return immutable map of property entries + */ + private static Map<String, PropertyEntry<?>> createPropertiesMetadata() { + Map<String, PropertyEntry<?>> map = new HashMap<>(); + + // Dataset location (immutable after creation) + map.put( + LOCATION, + stringImmutablePropertyEntry( + LOCATION, + "Dataset location (e.g., 'us', 'eu', 'asia-northeast1'). " + + "Cannot be changed after dataset creation.", + false, + null, + false, + false)); + + // Default table expiration days (mutable) + map.put( + DEFAULT_TABLE_EXPIRATION_DAYS, + stringOptionalPropertyEntry( + DEFAULT_TABLE_EXPIRATION_DAYS, + "Default expiration time for tables in days. " + + "Tables will be automatically deleted after this period.", + false, + null, + false)); + + // Default partition expiration days (mutable) + map.put( + DEFAULT_PARTITION_EXPIRATION_DAYS, + stringOptionalPropertyEntry( + DEFAULT_PARTITION_EXPIRATION_DAYS, + "Default expiration time for table partitions in days. " + + "Partitions will be automatically deleted after this period.", + false, + null, + false)); + + // Case insensitive setting (mutable) + map.put( + IS_CASE_INSENSITIVE, + stringOptionalPropertyEntry( + IS_CASE_INSENSITIVE, + "Whether the dataset is case insensitive for table and column names (true/false)", + false, + null, + false)); + + // Dataset description (mutable) + map.put( + DESCRIPTION, + stringOptionalPropertyEntry( + DESCRIPTION, "Dataset description for documentation purposes", false, null, false)); + + // Labels in JSON format (mutable) + map.put( + LABELS, + stringOptionalPropertyEntry( + LABELS, + "Dataset labels in JSON array format: [{\"key\":\"value\"}]. " + + "Used for organizing and filtering datasets.", + false, + null, + false)); + + // Default collation (immutable after creation) + map.put( + DEFAULT_COLLATION, + stringImmutablePropertyEntry( + DEFAULT_COLLATION, + "Default collation for STRING columns (e.g., 'und:ci' for case-insensitive). " + + "Cannot be changed after dataset creation.", + false, + null, + false, + false)); + + // Storage billing model (mutable, with restrictions) + map.put( + STORAGE_BILLING_MODEL, + stringOptionalPropertyEntry( + STORAGE_BILLING_MODEL, + "Storage billing model: 'LOGICAL' (default) or 'PHYSICAL'. " + + "Once changed, must wait 14 days before changing again.", + false, + null, + false)); + + // Max time travel hours (mutable) + map.put( + MAX_TIME_TRAVEL_HOURS, + stringOptionalPropertyEntry( + MAX_TIME_TRAVEL_HOURS, + "Time travel window in hours. Must be multiple of 24, between 48-168 hours. Default: 168", + false, + null, + false)); + + // Default KMS key name (mutable) + map.put( + DEFAULT_KMS_KEY_NAME, + stringOptionalPropertyEntry( + DEFAULT_KMS_KEY_NAME, + "Default Cloud KMS key for encrypting table data in this dataset. " + + "Format: projects/PROJECT/locations/LOCATION/keyRings/RING/cryptoKeys/KEY", + false, + null, + false)); + + // Default rounding mode (mutable) + map.put( + DEFAULT_ROUNDING_MODE, + stringOptionalPropertyEntry( + DEFAULT_ROUNDING_MODE, + "Default rounding mode for numeric operations: 'ROUND_HALF_EVEN' or 'ROUND_HALF_AWAY_FROM_ZERO'", + false, + null, + false)); + + // Friendly name (mutable) + map.put( + FRIENDLY_NAME, + stringOptionalPropertyEntry( + FRIENDLY_NAME, "A descriptive name for the dataset", false, null, false)); + + // Failover reservation (mutable) + map.put( + FAILOVER_RESERVATION, + stringOptionalPropertyEntry( + FAILOVER_RESERVATION, + "Associates the dataset to a reservation in case of failover scenario", + false, + null, + false)); + + // Is primary replica (mutable, for cross-region replication) + map.put( + IS_PRIMARY, + stringOptionalPropertyEntry( + IS_PRIMARY, + "Declares if the dataset is the primary replica (true/false). " + + "Only applicable for cross-region replicated datasets.", + false, + null, + false)); + + // Primary replica name (mutable, for cross-region replication) + map.put( + PRIMARY_REPLICA, + stringOptionalPropertyEntry( + PRIMARY_REPLICA, + "The replica name to set as the primary replica. " + + "Only applicable for cross-region replicated datasets.", + false, + null, + false)); + + // IAM tags (mutable) + map.put( + TAGS, + stringOptionalPropertyEntry( + TAGS, + "IAM tags in JSON array format: [{\"key\":\"value\"}]. " + + "Requires IAM tag functionality to be enabled.", + false, + null, + false)); + + return Collections.unmodifiableMap(map); + } + @Override protected Map<String, PropertyEntry<?>> specificPropertyEntries() { - return ImmutableMap.of(); + return PROPERTIES_METADATA; + } + + /** + * Checks if a property is supported by Gravitino BigQuery catalog. + * + * @param propertyName the property name to check + * @return true if supported, false otherwise + */ + public static boolean isSupportedProperty(String propertyName) { + return PROPERTIES_METADATA.containsKey(propertyName); + } + + /** + * Checks if a property is mutable (can be changed after dataset creation). + * + * @param propertyName the property name to check + * @return true if the property can be modified after creation + */ + public static boolean isMutableProperty(String propertyName) { + PropertyEntry<?> entry = PROPERTIES_METADATA.get(propertyName); + return entry != null && !entry.isImmutable(); } } diff --git a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryDatabaseOperations.java b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryDatabaseOperations.java index 96464acdcf..5df12adec7 100644 --- a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryDatabaseOperations.java +++ b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryDatabaseOperations.java @@ -18,40 +18,808 @@ */ package org.apache.gravitino.catalog.bigquery.operation; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetList; import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.catalog.bigquery.BigQueryClientPool; +import org.apache.gravitino.catalog.bigquery.BigQuerySchemaPropertiesMetadata; import org.apache.gravitino.catalog.jdbc.JdbcSchema; import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations; +import org.apache.gravitino.catalog.jdbc.utils.JdbcConnectorUtils; import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.meta.AuditInfo; /** Database operations for BigQuery. In BigQuery, databases are called datasets. */ public class BigQueryDatabaseOperations extends JdbcDatabaseOperations { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private BigQueryClientPool clientPool; + + /** + * Sets the BigQuery API client pool. + * + * @param clientPool BigQuery client pool + */ + public void setClientPool(BigQueryClientPool clientPool) { + this.clientPool = clientPool; + } + + /** + * Lists all datasets (databases) in the BigQuery project using BigQuery API. + * + * <p>This method uses the BigQuery API instead of INFORMATION_SCHEMA to avoid regional + * limitations. INFORMATION_SCHEMA is region-specific and cannot list datasets across all regions + * in a single query. + * + * @return list of dataset names + */ @Override - public String generateCreateDatabaseSql( - String databaseName, String comment, Map<String, String> properties) { - throw new NotImplementedException("To be implemented in the future"); + public List<String> listDatabases() { + List<String> databaseNames = new ArrayList<>(); + + // Use BigQuery API - required for cross-region dataset listing + BigQueryOperationUtils.BigQueryContext context = + BigQueryOperationUtils.getBigQueryContext(clientPool, "list datasets"); + + try { + LOG.debug("Listing BigQuery datasets using API for project: {}", context.getProjectId()); + + // List all datasets in the project using BigQuery API + Bigquery.Datasets.List request = + context.getBigquery().datasets().list(context.getProjectId()); + request.setAll(false); // Only list visible datasets (not hidden or temporary) + + DatasetList response = request.execute(); + + if (response.getDatasets() != null) { + for (DatasetList.Datasets dataset : response.getDatasets()) { + String datasetId = dataset.getDatasetReference().getDatasetId(); + if (!isSystemDatabase(datasetId)) { + databaseNames.add(datasetId); + LOG.debug("Found BigQuery dataset: {}", datasetId); + } + } + } + + LOG.info( + "Listed {} BigQuery datasets using API for project: {}", + databaseNames.size(), + context.getProjectId()); + return databaseNames; + + } catch (IOException e) { + LOG.error("Failed to list BigQuery datasets using API", e); + throw new RuntimeException("Failed to list BigQuery datasets using API", e); + } } + /** + * Checks if a dataset exists using BigQuery API. + * + * <p>This method uses the BigQuery API instead of JDBC to avoid regional limitations. JDBC + * connections cannot access datasets in different regions. + * + * @param databaseName the dataset name to check + * @return true if the dataset exists, false otherwise + */ @Override - public String generateDropDatabaseSql(String databaseName, boolean cascade) { - throw new NotImplementedException("To be implemented in the future"); + public boolean exist(String databaseName) { + // Use BigQuery API - required for cross-region dataset access + BigQueryOperationUtils.BigQueryContext context = + BigQueryOperationUtils.getBigQueryContext(clientPool, "check dataset existence"); + + try { + LOG.debug("Checking dataset existence using API: {}", databaseName); + + // Try to get the dataset using BigQuery API + context.getBigquery().datasets().get(context.getProjectId(), databaseName).execute(); + + LOG.debug("Dataset exists: {}", databaseName); + return true; + + } catch (IOException e) { + // 404 means dataset doesn't exist + if (e.getMessage() != null && e.getMessage().contains("404")) { + LOG.debug("Dataset does not exist: {}", databaseName); + return false; + } + // Other errors should be thrown + LOG.error("Failed to check dataset existence using API: {}", databaseName, e); + throw new RuntimeException( + "Failed to check dataset existence using BigQuery API: " + databaseName, e); + } } + /** + * Loads a dataset's metadata using BigQuery API. + * + * <p>This method uses the BigQuery API to retrieve dataset information including all available + * properties. All property values are returned as strings for consistency. + * + * @param databaseName the dataset name to load + * @return JdbcSchema object containing dataset metadata + * @throws NoSuchSchemaException if the dataset does not exist + */ @Override public JdbcSchema load(String databaseName) throws NoSuchSchemaException { - throw new NotImplementedException("To be implemented in the future"); + // Use BigQuery API - required for cross-region dataset access and complete metadata + BigQueryOperationUtils.BigQueryContext context = + BigQueryOperationUtils.getBigQueryContext(clientPool, "load dataset"); + + try { + LOG.debug("Loading dataset using API: {}", databaseName); + + // Get dataset details using BigQuery API + Dataset dataset = + context.getBigquery().datasets().get(context.getProjectId(), databaseName).execute(); + + // Extract all available properties from dataset + Map<String, String> properties = extractDatasetProperties(dataset); + + LOG.info( + "Loaded dataset using API: {} in location: {} with {} properties", + databaseName, + dataset.getLocation(), + properties.size()); + + return JdbcSchema.builder() + .withName(databaseName) + .withComment(dataset.getDescription()) + .withProperties(properties) + .withAuditInfo(AuditInfo.EMPTY) + .build(); + + } catch (IOException e) { + // 404 means dataset doesn't exist + if (e.getMessage() != null && e.getMessage().contains("404")) { + LOG.warn("Dataset not found: {}", databaseName); + throw new NoSuchSchemaException("Database %s could not be found", databaseName); + } + // Other errors should be thrown + LOG.error("Failed to load dataset using API: {}", databaseName, e); + throw new RuntimeException("Failed to load dataset using BigQuery API: " + databaseName, e); + } + } + + /** + * Extracts all available properties from a BigQuery dataset and converts them to string values. + * + * @param dataset the BigQuery dataset object + * @return map of property names to string values + */ + private Map<String, String> extractDatasetProperties(Dataset dataset) { + Map<String, String> properties = new HashMap<>(); + + // Location + if (dataset.getLocation() != null) { + properties.put(BigQuerySchemaPropertiesMetadata.LOCATION, dataset.getLocation()); + } + + // Description + if (dataset.getDescription() != null) { + properties.put(BigQuerySchemaPropertiesMetadata.DESCRIPTION, dataset.getDescription()); + } + + // Friendly name + if (dataset.getFriendlyName() != null) { + properties.put(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME, dataset.getFriendlyName()); + } + + // Default table expiration (convert from milliseconds to days) + if (dataset.getDefaultTableExpirationMs() != null) { + double expirationDays = dataset.getDefaultTableExpirationMs() / (1000.0 * 60 * 60 * 24); + properties.put( + BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS, + String.valueOf(expirationDays)); + } + + // Default partition expiration (convert from milliseconds to days) + if (dataset.getDefaultPartitionExpirationMs() != null) { + double expirationDays = dataset.getDefaultPartitionExpirationMs() / (1000.0 * 60 * 60 * 24); + properties.put( + BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS, + String.valueOf(expirationDays)); + } + + // Storage billing model + if (dataset.getStorageBillingModel() != null) { + properties.put( + BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL, dataset.getStorageBillingModel()); + } + + // Max time travel hours (convert from milliseconds to hours) + if (dataset.getMaxTimeTravelHours() != null) { + properties.put( + BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS, + String.valueOf(dataset.getMaxTimeTravelHours())); + } + + // Default KMS key name + if (dataset.getDefaultEncryptionConfiguration() != null + && dataset.getDefaultEncryptionConfiguration().getKmsKeyName() != null) { + properties.put( + BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME, + dataset.getDefaultEncryptionConfiguration().getKmsKeyName()); + } + + // Default rounding mode + if (dataset.getDefaultRoundingMode() != null) { + properties.put( + BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE, dataset.getDefaultRoundingMode()); + } + + // Case insensitive setting + if (dataset.getIsCaseInsensitive() != null) { + properties.put( + BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE, + String.valueOf(dataset.getIsCaseInsensitive())); + } + + // Labels (convert to JSON-like string representation) + if (dataset.getLabels() != null && !dataset.getLabels().isEmpty()) { + StringBuilder labelsJson = new StringBuilder("["); + boolean first = true; + for (Map.Entry<String, String> label : dataset.getLabels().entrySet()) { + if (!first) { + labelsJson.append(", "); + } + first = false; + labelsJson + .append("{\"") + .append(BigQueryOperationUtils.escapeString(label.getKey())) + .append("\":\"") + .append(BigQueryOperationUtils.escapeString(label.getValue())) + .append("\"}"); + } + labelsJson.append("]"); + properties.put(BigQuerySchemaPropertiesMetadata.LABELS, labelsJson.toString()); + } + + // Cross-region replication properties (if applicable) + // Note: These properties may not be available in all BigQuery API versions + // They are typically managed through separate replication APIs + + // Default collation (if available) + // Note: This property may not be directly available through the Dataset API + // It's typically set during dataset creation and may require separate queries + + LOG.debug("Extracted {} properties from dataset: {}", properties.size(), properties.keySet()); + return properties; + } + + @Override + protected String generateDatabaseExistSql(String databaseName) { + // In BigQuery, we need to query INFORMATION_SCHEMA.SCHEMATA without setting schema context + // to avoid the "my_dataset.information_schema" issue + // Note: This method is not used in practice as we override exist() to use BigQuery API + // However, we escape wildcards to prevent SQL injection if this method is ever called + String escapedName = databaseName.replace("\\", "\\\\").replace("_", "\\_").replace("%", "\\%"); + return String.format( + "SELECT schema_name FROM INFORMATION_SCHEMA.SCHEMATA WHERE schema_name = '%s'", + escapedName); + } + + @Override + public String generateDropDatabaseSql(String databaseName, boolean cascade) { + // BigQuery uses DROP SCHEMA syntax, not DROP DATABASE + if (cascade) { + return String.format("DROP SCHEMA `%s` CASCADE", databaseName); + } else { + return String.format("DROP SCHEMA `%s` RESTRICT", databaseName); + } } @Override protected boolean supportSchemaComment() { - return true; + return true; // BigQuery supports dataset descriptions via OPTIONS } @Override protected Set<String> createSysDatabaseNameSet() { return ImmutableSet.of("information_schema", "INFORMATION_SCHEMA"); } + + /** + * Generates CREATE SCHEMA (Dataset) SQL for BigQuery. + * + * <p>BigQuery syntax: CREATE SCHEMA [IF NOT EXISTS] dataset_name [DEFAULT COLLATE + * collate_specification] [OPTIONS(schema_option_list)] + * + * @param databaseName the dataset name + * @param comment the dataset description + * @param properties BigQuery-specific dataset properties + * @return the CREATE SCHEMA SQL statement + */ + @Override + public String generateCreateDatabaseSql( + String databaseName, String comment, Map<String, String> properties) { + + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("CREATE SCHEMA "); + + // Dataset name with backticks for safety + sqlBuilder.append("`").append(databaseName).append("`"); + + // Default collation (immutable, only at creation) + if (properties != null + && properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_COLLATION)) { + String collation = properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_COLLATION); + if (StringUtils.isNotEmpty(collation)) { + sqlBuilder.append("\nDEFAULT COLLATE '").append(collation).append("'"); + } + } + + // OPTIONS clause + String optionsClause = generateSchemaOptionsClause(comment, properties); + if (!optionsClause.isEmpty()) { + sqlBuilder.append("\n").append(optionsClause); + } + + String result = sqlBuilder.append(";").toString(); + LOG.info("Generated CREATE SCHEMA SQL for dataset '{}': {}", databaseName, result); + return result; + } + + /** + * Generates the OPTIONS clause for CREATE SCHEMA with all supported BigQuery properties. + * + * @param comment the dataset description + * @param properties BigQuery-specific dataset properties + * @return the OPTIONS clause, or empty string if no options + */ + private String generateSchemaOptionsClause(String comment, Map<String, String> properties) { + List<String> options = new ArrayList<>(); + + // Description from comment parameter + if (StringUtils.isNotEmpty(comment)) { + options.add( + String.format("description=\"%s\"", BigQueryOperationUtils.escapeString(comment))); + } + + if (properties != null) { + // Location (immutable, only at creation) + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.LOCATION)) { + String location = properties.get(BigQuerySchemaPropertiesMetadata.LOCATION); + if (StringUtils.isNotEmpty(location)) { + options.add(String.format("location=\"%s\"", location)); + } + } + + // Default table expiration days + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS)) { + String value = + properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS); + if (StringUtils.isNotEmpty(value)) { + options.add(String.format("default_table_expiration_days=%s", value)); + } + } + + // Default partition expiration days + if (properties.containsKey( + BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS)) { + String value = + properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS); + if (StringUtils.isNotEmpty(value)) { + options.add(String.format("default_partition_expiration_days=%s", value)); + } + } + + // Case insensitive setting + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE)) { + String value = properties.get(BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE); + if (StringUtils.isNotEmpty(value)) { + options.add(String.format("is_case_insensitive=%s", value)); + } + } + + // Storage billing model + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL)) { + String value = properties.get(BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL); + if (StringUtils.isNotEmpty(value)) { + options.add(String.format("storage_billing_model=\"%s\"", value)); + } + } + + // Max time travel hours + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS)) { + String value = properties.get(BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS); + if (StringUtils.isNotEmpty(value)) { + options.add(String.format("max_time_travel_hours=%s", value)); + } + } + + // Default KMS key name + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME)) { + String value = properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME); + if (StringUtils.isNotEmpty(value)) { + options.add(String.format("default_kms_key_name=\"%s\"", value)); + } + } + + // Default rounding mode + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE)) { + String value = properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE); + if (StringUtils.isNotEmpty(value)) { + options.add(String.format("default_rounding_mode=\"%s\"", value)); + } + } + + // Friendly name + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME)) { + String value = properties.get(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME); + if (StringUtils.isNotEmpty(value)) { + options.add( + String.format("friendly_name=\"%s\"", BigQueryOperationUtils.escapeString(value))); + } + } + + // Failover reservation + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.FAILOVER_RESERVATION)) { + String value = properties.get(BigQuerySchemaPropertiesMetadata.FAILOVER_RESERVATION); + if (StringUtils.isNotEmpty(value)) { + options.add(String.format("failover_reservation=\"%s\"", value)); + } + } + + // Is primary replica + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.IS_PRIMARY)) { + String value = properties.get(BigQuerySchemaPropertiesMetadata.IS_PRIMARY); + if (StringUtils.isNotEmpty(value)) { + options.add(String.format("is_primary=%s", value)); + } + } + + // Primary replica name + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.PRIMARY_REPLICA)) { + String value = properties.get(BigQuerySchemaPropertiesMetadata.PRIMARY_REPLICA); + if (StringUtils.isNotEmpty(value)) { + options.add(String.format("primary_replica=\"%s\"", value)); + } + } + + // Labels (expect JSON array format) + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.LABELS)) { + String labels = properties.get(BigQuerySchemaPropertiesMetadata.LABELS); + if (StringUtils.isNotEmpty(labels)) { + String labelsClause = convertLabelsToSqlFormat(labels); + if (StringUtils.isNotEmpty(labelsClause)) { + options.add(String.format("labels=%s", labelsClause)); + } + } + } + + // IAM Tags (expect JSON array format) + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.TAGS)) { + String tags = properties.get(BigQuerySchemaPropertiesMetadata.TAGS); + if (StringUtils.isNotEmpty(tags)) { + String tagsClause = convertTagsToSqlFormat(tags); + if (StringUtils.isNotEmpty(tagsClause)) { + options.add(String.format("tags=%s", tagsClause)); + } + } + } + + // Description from properties (overrides comment if both present) + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.DESCRIPTION)) { + String description = properties.get(BigQuerySchemaPropertiesMetadata.DESCRIPTION); + if (StringUtils.isNotEmpty(description)) { + // Remove description from comment if it was already added + options.removeIf(option -> option.startsWith("description=")); + options.add( + String.format( + "description=\"%s\"", BigQueryOperationUtils.escapeString(description))); + } + } + } + + if (options.isEmpty()) { + return ""; + } + + return "OPTIONS(\n " + String.join(",\n ", options) + "\n)"; + } + + /** + * Converts JSON array format labels to BigQuery SQL format. + * + * <p>Input: [{"key1":"value1"},{"key2":"value2"}] Output: [("key1","value1"),("key2","value2")] + * + * <p>Uses Jackson for robust JSON parsing with proper error handling. + * + * @param labelsJson JSON array string + * @return BigQuery SQL format labels string + */ + private String convertLabelsToSqlFormat(String labelsJson) { + if (StringUtils.isBlank(labelsJson)) { + return ""; + } + + try { + // Parse JSON array of label objects using Jackson + List<Map<String, String>> labelsList = + OBJECT_MAPPER.readValue(labelsJson, new TypeReference<>() {}); + + if (labelsList == null || labelsList.isEmpty()) { + return ""; + } + + List<String> labelPairs = new ArrayList<>(); + for (Map<String, String> labelMap : labelsList) { + // Each map should have exactly one key-value pair + for (Map.Entry<String, String> entry : labelMap.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (StringUtils.isNotBlank(key) && value != null) { + labelPairs.add(String.format("(\"%s\",\"%s\")", key, value)); + } + } + } + + if (labelPairs.isEmpty()) { + return ""; + } + + return "[" + String.join(",", labelPairs) + "]"; + + } catch (Exception e) { + LOG.warn( + "Failed to parse labels JSON using Jackson: {}. Error: {}. Using as-is.", + labelsJson, + e.getMessage()); + return labelsJson; // Return as-is if parsing fails + } + } + + /** + * Converts JSON array format tags to BigQuery SQL format. + * + * <p>Similar to labels conversion but for IAM tags. Tags use the same format as labels in + * BigQuery SQL. + * + * @param tagsJson JSON array string + * @return BigQuery SQL format tags string + */ + private String convertTagsToSqlFormat(String tagsJson) { + // Tags use the same format as labels in BigQuery SQL + return convertLabelsToSqlFormat(tagsJson); + } + + @Override + public void create(String databaseName, String comment, Map<String, String> properties) + throws SchemaAlreadyExistsException { + // Validate properties before creation + validateProperties(properties, false); + + LOG.info("Beginning to create database {}", databaseName); + String originComment = StringIdentifier.removeIdFromComment(comment); + // Check if schema comment is supported by the database + // For BigQuery, supportSchemaComment() returns true, so this check will pass + // This is a standard pattern from JdbcDatabaseOperations to ensure compatibility + if (!supportSchemaComment() && StringUtils.isNotEmpty(originComment)) { + throw new UnsupportedOperationException( + "Doesn't support setting schema comment: " + originComment); + } + + try (final Connection connection = getConnection()) { + JdbcConnectorUtils.executeUpdate( + connection, generateCreateDatabaseSql(databaseName, comment, properties)); + LOG.info("Finished creating database {}", databaseName); + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + /** + * Alters a dataset's properties using BigQuery ALTER SCHEMA statement. + * + * <p>Only mutable properties can be altered. Immutable properties (location, default_collation) + * will be ignored with a warning. + * + * @param databaseName the dataset name to alter + * @param properties the properties to update (only mutable properties will be applied) + */ + @SuppressWarnings("unused") + public void alter(String databaseName, Map<String, String> properties) { + // Validate properties for ALTER operation + validateProperties(properties, true); + + String alterSql = generateAlterDatabaseSql(databaseName, properties); + if (StringUtils.isEmpty(alterSql)) { + LOG.info("No mutable properties to alter for dataset '{}'", databaseName); + return; + } + + LOG.info("Beginning to alter database {}", databaseName); + try (final Connection connection = getConnection()) { + JdbcConnectorUtils.executeUpdate(connection, alterSql); + LOG.info("Finished altering database {}", databaseName); + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + /** + * Generates ALTER SCHEMA SET OPTIONS SQL for BigQuery. + * + * <p>BigQuery syntax: ALTER SCHEMA [IF EXISTS] dataset_name SET OPTIONS(schema_option_list) + * + * <p>Note: Only mutable properties can be altered. Immutable properties (location, + * default_collation) cannot be changed after dataset creation. + * + * @param databaseName the dataset name + * @param properties the properties to update (only mutable properties) + * @return the ALTER SCHEMA SQL statement, or empty string if no mutable properties + */ + protected String generateAlterDatabaseSql(String databaseName, Map<String, String> properties) { + + if (properties == null || properties.isEmpty()) { + return ""; + } + + List<String> options = new ArrayList<>(); + + // Define mutable properties with their SQL format + // Format: property key -> SQL option format (with %s placeholder for value) + Map<String, PropertyFormatter> mutableProperties = new HashMap<>(); + + // Numeric properties (no quotes) + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS, + new PropertyFormatter("default_table_expiration_days=%s", false)); + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS, + new PropertyFormatter("default_partition_expiration_days=%s", false)); + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE, + new PropertyFormatter("is_case_insensitive=%s", false)); + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS, + new PropertyFormatter("max_time_travel_hours=%s", false)); + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.IS_PRIMARY, new PropertyFormatter("is_primary=%s", false)); + + // String properties (with quotes) + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL, + new PropertyFormatter("storage_billing_model=\"%s\"", false)); + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME, + new PropertyFormatter("default_kms_key_name=\"%s\"", false)); + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE, + new PropertyFormatter("default_rounding_mode=\"%s\"", false)); + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.FAILOVER_RESERVATION, + new PropertyFormatter("failover_reservation=\"%s\"", false)); + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.PRIMARY_REPLICA, + new PropertyFormatter("primary_replica=\"%s\"", false)); + + // String properties that need escaping + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME, + new PropertyFormatter("friendly_name=\"%s\"", true)); + mutableProperties.put( + BigQuerySchemaPropertiesMetadata.DESCRIPTION, + new PropertyFormatter("description=\"%s\"", true)); + + // Process standard properties + for (Map.Entry<String, PropertyFormatter> entry : mutableProperties.entrySet()) { + String propertyKey = entry.getKey(); + PropertyFormatter formatter = entry.getValue(); + + if (properties.containsKey(propertyKey)) { + String value = properties.get(propertyKey); + if (StringUtils.isNotEmpty(value)) { + String formattedValue = + formatter.needsEscaping ? BigQueryOperationUtils.escapeString(value) : value; + options.add(String.format(formatter.format, formattedValue)); + } + } + } + + // Labels (mutable) + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.LABELS)) { + String labels = properties.get(BigQuerySchemaPropertiesMetadata.LABELS); + if (StringUtils.isNotEmpty(labels)) { + String labelsClause = convertLabelsToSqlFormat(labels); + if (StringUtils.isNotEmpty(labelsClause)) { + options.add(String.format("labels=%s", labelsClause)); + } + } + } + + // IAM Tags (mutable) + if (properties.containsKey(BigQuerySchemaPropertiesMetadata.TAGS)) { + String tags = properties.get(BigQuerySchemaPropertiesMetadata.TAGS); + if (StringUtils.isNotEmpty(tags)) { + String tagsClause = convertTagsToSqlFormat(tags); + if (StringUtils.isNotEmpty(tagsClause)) { + options.add(String.format("tags=%s", tagsClause)); + } + } + } + + if (options.isEmpty()) { + LOG.debug("No mutable properties to alter for dataset '{}'", databaseName); + return ""; + } + + String result = + String.format( + "ALTER SCHEMA `%s`\nSET OPTIONS(\n %s\n);", + databaseName, String.join(",\n ", options)); + + LOG.info("Generated ALTER SCHEMA SQL for dataset '{}': {}", databaseName, result); + return result; + } + + /** + * Validates dataset properties and logs warnings for unsupported properties. + * + * @param properties the properties to validate + * @param isAlter true if this is for ALTER operation, false for CREATE + */ + private void validateProperties(Map<String, String> properties, boolean isAlter) { + if (properties == null || properties.isEmpty()) { + return; + } + + for (String propertyName : properties.keySet()) { + // Skip Gravitino internal properties + if (propertyName.equals(StringIdentifier.ID_KEY)) { + continue; + } + + // Check if property is supported + if (!BigQuerySchemaPropertiesMetadata.isSupportedProperty(propertyName)) { + LOG.warn( + "Unknown property '{}' for BigQuery dataset. This property will be ignored. " + + "Supported properties are: location, default_table_expiration_days, " + + "default_partition_expiration_days, storage_billing_model, max_time_travel_hours, " + + "default_kms_key_name, default_rounding_mode, is_case_insensitive, description, " + + "labels, default_collation, friendly_name, failover_reservation, is_primary, " + + "primary_replica, tags", + propertyName); + } else if (isAlter) { + // For ALTER operations, check if property is mutable + if (!BigQuerySchemaPropertiesMetadata.isMutableProperty(propertyName)) { + LOG.warn( + "Property '{}' is immutable and cannot be changed after dataset creation. " + + "This property will be ignored in ALTER operation.", + propertyName); + } + } + } + } + + /** + * Helper class to format property values for BigQuery SQL. + * + * <p>Encapsulates the SQL format string and whether the value needs escaping. + */ + private static class PropertyFormatter { + final String format; + final boolean needsEscaping; + + PropertyFormatter(String format, boolean needsEscaping) { + this.format = format; + this.needsEscaping = needsEscaping; + } + } } diff --git a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryOperationUtils.java b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryOperationUtils.java new file mode 100644 index 0000000000..9d828dcbf4 --- /dev/null +++ b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryOperationUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.bigquery.operation; + +import com.google.api.services.bigquery.Bigquery; +import org.apache.gravitino.catalog.bigquery.BigQueryClientPool; + +/** + * Utility class for BigQuery operations. + * + * <p>Provides common helper methods for BigQuery API operations to reduce code duplication. + */ +public class BigQueryOperationUtils { + + /** + * Gets BigQuery API client and project ID from the client pool. + * + * <p>This method validates that the client pool is available and returns the necessary components + * for BigQuery API operations. + * + * @param clientPool the BigQuery client pool + * @param operationName the name of the operation (for error messages) + * @return BigQueryContext containing the client and project ID + * @throws RuntimeException if client pool is not available + */ + public static BigQueryContext getBigQueryContext( + BigQueryClientPool clientPool, String operationName) { + if (clientPool == null) { + throw new RuntimeException( + String.format( + "BigQuery API client pool is not available. Cannot perform %s operation.", + operationName)); + } + + Bigquery bigquery = clientPool.getClient(); + String projectId = clientPool.getProjectId(); + + return new BigQueryContext(bigquery, projectId); + } + + /** + * Context object containing BigQuery API client and project ID. + * + * <p>This class encapsulates the BigQuery client and project ID to simplify method signatures. + */ + public static class BigQueryContext { + private final Bigquery bigquery; + private final String projectId; + + public BigQueryContext(Bigquery bigquery, String projectId) { + this.bigquery = bigquery; + this.projectId = projectId; + } + + public Bigquery getBigquery() { + return bigquery; + } + + public String getProjectId() { + return projectId; + } + } + + /** + * Escapes special characters in strings for BigQuery SQL. + * + * @param str the string to escape + * @return the escaped string + */ + public static String escapeString(String str) { + if (str == null) { + return ""; + } + return str.replace("\\", "\\\\") + .replace("\"", "\\\"") + .replace("\n", "\\n") + .replace("\r", "\\r") + .replace("\t", "\\t"); + } + + private BigQueryOperationUtils() { + // Utility class, prevent instantiation + } +} diff --git a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java index f8119c57b2..ab7b927087 100644 --- a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java +++ b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java @@ -23,6 +23,7 @@ import java.sql.ResultSet; import java.util.List; import java.util.Map; import org.apache.commons.lang3.NotImplementedException; +import org.apache.gravitino.catalog.bigquery.BigQueryClientPool; import org.apache.gravitino.catalog.jdbc.JdbcColumn; import org.apache.gravitino.catalog.jdbc.JdbcTable; import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations; @@ -34,6 +35,26 @@ import org.apache.gravitino.rel.indexes.Index; /** Table operations for BigQuery. */ public class BigQueryTableOperations extends JdbcTableOperations { + private BigQueryClientPool clientPool; + + /** + * Sets the BigQuery API client pool. + * + * @param clientPool BigQuery client pool + */ + public void setClientPool(BigQueryClientPool clientPool) { + this.clientPool = clientPool; + } + + /** + * Gets the BigQuery API client pool. + * + * @return BigQuery client pool, may be null if not set + */ + protected BigQueryClientPool getClientPool() { + return clientPool; + } + @Override protected String generateCreateTableSql( String tableName, diff --git a/catalogs/catalog-jdbc-bigquery/src/main/resources/jdbc-bigquery.conf b/catalogs/catalog-jdbc-bigquery/src/main/resources/jdbc-bigquery.conf index cf1befc96f..0557c12d62 100644 --- a/catalogs/catalog-jdbc-bigquery/src/main/resources/jdbc-bigquery.conf +++ b/catalogs/catalog-jdbc-bigquery/src/main/resources/jdbc-bigquery.conf @@ -19,11 +19,11 @@ # BigQuery JDBC connection configuration # Example configuration with Service Account authentication: -# 1. jdbc-driver = com.simba.googlebigquery.jdbc42.Driver -# 2. jdbc-url = jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443 +# jdbc-driver = com.simba.googlebigquery.jdbc42.Driver +# jdbc-url = jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443 # Google Cloud Project -# 3. project-id = <your-project-id> +# project-id = <your-project-id> # Note: jdbc-user and jdbc-password are not typically used with BigQuery -# 4. jdbc-user = <your-service-account>@<your-project-id>.iam.gserviceaccount.com -#/path/to/key.json -# 5. jdbc-password = <your-key-json-path> \ No newline at end of file +# jdbc-user = <your-service-account>@<your-project-id>.iam.gserviceaccount.com +# Add /path/to/key.json +# jdbc-password = <your-key-json-path> \ No newline at end of file diff --git a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryCatalog.java b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryCatalog.java new file mode 100644 index 0000000000..819a07fd94 --- /dev/null +++ b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryCatalog.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.bigquery; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +/** Unit tests for BigQueryCatalog. */ +public class TestBigQueryCatalog { + + @Test + void testShortName() { + BigQueryCatalog catalog = new BigQueryCatalog(); + assertEquals("jdbc-bigquery", catalog.shortName()); + } + + @Test + void testNewCapability() { + BigQueryCatalog catalog = new BigQueryCatalog(); + assertNotNull(catalog.newCapability()); + assertTrue(catalog.newCapability() instanceof BigQueryCatalogCapability); + } + + @Test + void testPropertiesMetadata() { + BigQueryCatalog catalog = new BigQueryCatalog(); + + assertNotNull(catalog.catalogPropertiesMetadata()); + assertTrue(catalog.catalogPropertiesMetadata() instanceof BigQueryCatalogPropertiesMetadata); + + assertNotNull(catalog.schemaPropertiesMetadata()); + assertTrue(catalog.schemaPropertiesMetadata() instanceof BigQuerySchemaPropertiesMetadata); + + assertNotNull(catalog.tablePropertiesMetadata()); + assertTrue(catalog.tablePropertiesMetadata() instanceof BigQueryTablePropertiesMetadata); + } + + @Test + void testWithCatalogConf() { + Map<String, String> config = new HashMap<>(); + config.put("project-id", "test-project"); + config.put("jdbc-password", "/path/to/key.json"); + config.put("jdbc-url", "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;"); + + BigQueryCatalog catalog = new BigQueryCatalog(); + BigQueryCatalog configuredCatalog = catalog.withCatalogConf(config); + + assertNotNull(configuredCatalog); + assertNotNull(configuredCatalog.getClientPool()); + assertEquals("test-project", configuredCatalog.getClientPool().getProjectId()); + } + + @Test + void testBuildJdbcUrlWithExistingAuth() { + Map<String, String> config = new HashMap<>(); + config.put("project-id", "test-project"); + config.put("jdbc-password", "/path/to/key.json"); + config.put( + "jdbc-url", + "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=test-project;OAuthType=0;"); + + BigQueryCatalog catalog = new BigQueryCatalog(); + BigQueryCatalog configuredCatalog = catalog.withCatalogConf(config); + + assertNotNull(configuredCatalog); + assertNotNull(configuredCatalog.getClientPool()); + } + + @Test + void testBuildJdbcUrlWithoutAuth() { + Map<String, String> config = new HashMap<>(); + config.put("project-id", "test-project"); + config.put("jdbc-password", "/path/to/key.json"); + config.put("jdbc-user", "[email protected]"); + + BigQueryCatalog catalog = new BigQueryCatalog(); + BigQueryCatalog configuredCatalog = catalog.withCatalogConf(config); + + assertNotNull(configuredCatalog); + assertNotNull(configuredCatalog.getClientPool()); + } +} diff --git a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryClientPool.java b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryClientPool.java new file mode 100644 index 0000000000..678c37f138 --- /dev/null +++ b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryClientPool.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.bigquery; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +/** Unit tests for BigQueryClientPool. */ +public class TestBigQueryClientPool { + + @Test + void testConstructorWithValidConfig() { + Map<String, String> config = new HashMap<>(); + config.put("project-id", "test-project"); + config.put("jdbc-password", "/path/to/key.json"); + + BigQueryClientPool pool = new BigQueryClientPool(config); + assertNotNull(pool); + assertEquals("test-project", pool.getProjectId()); + } + + @Test + void testConstructorWithMissingProjectId() { + Map<String, String> config = new HashMap<>(); + config.put("jdbc-password", "/path/to/key.json"); + + assertThrows(IllegalArgumentException.class, () -> new BigQueryClientPool(config)); + } + + @Test + void testConstructorWithMissingKeyFile() { + Map<String, String> config = new HashMap<>(); + config.put("project-id", "test-project"); + + assertThrows(IllegalArgumentException.class, () -> new BigQueryClientPool(config)); + } + + @Test + void testConstructorWithEmptyProjectId() { + Map<String, String> config = new HashMap<>(); + config.put("project-id", ""); + config.put("jdbc-password", "/path/to/key.json"); + + assertThrows(IllegalArgumentException.class, () -> new BigQueryClientPool(config)); + } + + @Test + void testConstructorWithEmptyKeyFile() { + Map<String, String> config = new HashMap<>(); + config.put("project-id", "test-project"); + config.put("jdbc-password", ""); + + assertThrows(IllegalArgumentException.class, () -> new BigQueryClientPool(config)); + } + + @Test + void testClose() { + Map<String, String> config = new HashMap<>(); + config.put("project-id", "test-project"); + config.put("jdbc-password", "/path/to/key.json"); + + BigQueryClientPool pool = new BigQueryClientPool(config); + // Should not throw exception + pool.close(); + } +} diff --git a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQuerySchemaPropertiesMetadata.java b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQuerySchemaPropertiesMetadata.java new file mode 100644 index 0000000000..e13e85986d --- /dev/null +++ b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQuerySchemaPropertiesMetadata.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.bigquery; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; +import org.apache.gravitino.connector.PropertyEntry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Unit tests for BigQuerySchemaPropertiesMetadata. */ +public class TestBigQuerySchemaPropertiesMetadata { + + private BigQuerySchemaPropertiesMetadata metadata; + + @BeforeEach + void setUp() { + metadata = new BigQuerySchemaPropertiesMetadata(); + } + + @Test + void testPropertyEntries() { + Map<String, PropertyEntry<?>> properties = metadata.propertyEntries(); + assertNotNull(properties); + + // Check that all expected properties are present + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.LOCATION)); + assertTrue( + properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS)); + assertTrue( + properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.DESCRIPTION)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.LABELS)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_COLLATION)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.FAILOVER_RESERVATION)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.IS_PRIMARY)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.PRIMARY_REPLICA)); + assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.TAGS)); + } + + @Test + void testLocationProperty() { + Map<String, PropertyEntry<?>> properties = metadata.propertyEntries(); + PropertyEntry<?> locationProperty = properties.get(BigQuerySchemaPropertiesMetadata.LOCATION); + + assertNotNull(locationProperty); + assertTrue(locationProperty.isImmutable()); + assertFalse(locationProperty.isRequired()); + } + + @Test + void testDefaultCollationProperty() { + Map<String, PropertyEntry<?>> properties = metadata.propertyEntries(); + PropertyEntry<?> collationProperty = + properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_COLLATION); + + assertNotNull(collationProperty); + assertTrue(collationProperty.isImmutable()); + assertFalse(collationProperty.isRequired()); + } + + @Test + void testMutableProperties() { + Map<String, PropertyEntry<?>> properties = metadata.propertyEntries(); + + // These properties should be mutable + PropertyEntry<?> expirationProperty = + properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS); + assertNotNull(expirationProperty); + assertFalse(expirationProperty.isImmutable()); + + PropertyEntry<?> partitionExpirationProperty = + properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS); + assertNotNull(partitionExpirationProperty); + assertFalse(partitionExpirationProperty.isImmutable()); + + PropertyEntry<?> caseInsensitiveProperty = + properties.get(BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE); + assertNotNull(caseInsensitiveProperty); + assertFalse(caseInsensitiveProperty.isImmutable()); + + PropertyEntry<?> descriptionProperty = + properties.get(BigQuerySchemaPropertiesMetadata.DESCRIPTION); + assertNotNull(descriptionProperty); + assertFalse(descriptionProperty.isImmutable()); + + PropertyEntry<?> labelsProperty = properties.get(BigQuerySchemaPropertiesMetadata.LABELS); + assertNotNull(labelsProperty); + assertFalse(labelsProperty.isImmutable()); + + PropertyEntry<?> storageBillingProperty = + properties.get(BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL); + assertNotNull(storageBillingProperty); + assertFalse(storageBillingProperty.isImmutable()); + + PropertyEntry<?> timeTravelProperty = + properties.get(BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS); + assertNotNull(timeTravelProperty); + assertFalse(timeTravelProperty.isImmutable()); + + PropertyEntry<?> kmsKeyProperty = + properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME); + assertNotNull(kmsKeyProperty); + assertFalse(kmsKeyProperty.isImmutable()); + + PropertyEntry<?> roundingModeProperty = + properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE); + assertNotNull(roundingModeProperty); + assertFalse(roundingModeProperty.isImmutable()); + + PropertyEntry<?> friendlyNameProperty = + properties.get(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME); + assertNotNull(friendlyNameProperty); + assertFalse(friendlyNameProperty.isImmutable()); + } +} diff --git a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQueryDatabaseOperations.java b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQueryDatabaseOperations.java new file mode 100644 index 0000000000..9b5e1963ed --- /dev/null +++ b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQueryDatabaseOperations.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.bigquery.operation; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import org.apache.gravitino.catalog.bigquery.BigQuerySchemaPropertiesMetadata; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Unit tests for BigQueryDatabaseOperations. */ +public class TestBigQueryDatabaseOperations { + + private BigQueryDatabaseOperations operations; + + @BeforeEach + void setUp() { + operations = new BigQueryDatabaseOperations(); + } + + @Test + void testGenerateCreateDatabaseSql() { + // Test basic CREATE SCHEMA + String sql = operations.generateCreateDatabaseSql("test_dataset", null, null); + assertEquals("CREATE SCHEMA `test_dataset`;", sql); + + // Test with comment only + sql = operations.generateCreateDatabaseSql("test_dataset", "Test description", null); + assertTrue(sql.contains("CREATE SCHEMA `test_dataset`")); + assertTrue(sql.contains("OPTIONS")); + assertTrue(sql.contains("description=\"Test description\"")); + + // Test with comprehensive properties + Map<String, String> properties = new HashMap<>(); + properties.put(BigQuerySchemaPropertiesMetadata.LOCATION, "us-central1"); + properties.put(BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS, "30"); + properties.put(BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS, "365"); + properties.put(BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL, "LOGICAL"); + properties.put(BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS, "168"); + properties.put(BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE, "ROUND_HALF_EVEN"); + properties.put(BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE, "true"); + properties.put(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME, "Test Dataset"); + + sql = operations.generateCreateDatabaseSql("test_dataset", "Test description", properties); + assertTrue(sql.contains("CREATE SCHEMA `test_dataset`")); + assertTrue(sql.contains("OPTIONS")); + assertTrue(sql.contains("location=\"us-central1\"")); + assertTrue(sql.contains("default_table_expiration_days=30")); + assertTrue(sql.contains("default_partition_expiration_days=365")); + assertTrue(sql.contains("storage_billing_model=\"LOGICAL\"")); + assertTrue(sql.contains("max_time_travel_hours=168")); + assertTrue(sql.contains("default_rounding_mode=\"ROUND_HALF_EVEN\"")); + assertTrue(sql.contains("is_case_insensitive=true")); + assertTrue(sql.contains("friendly_name=\"Test Dataset\"")); + assertTrue(sql.contains("description=\"Test description\"")); + + // Test with default collation + properties.put(BigQuerySchemaPropertiesMetadata.DEFAULT_COLLATION, "und:ci"); + sql = operations.generateCreateDatabaseSql("test_dataset", "Test description", properties); + assertTrue(sql.contains("DEFAULT COLLATE 'und:ci'")); + } + + @Test + void testGenerateDropDatabaseSql() { + // Test DROP SCHEMA without cascade + String sql = operations.generateDropDatabaseSql("test_dataset", false); + assertEquals("DROP SCHEMA `test_dataset` RESTRICT", sql); + + // Test DROP SCHEMA with cascade + sql = operations.generateDropDatabaseSql("test_dataset", true); + assertEquals("DROP SCHEMA `test_dataset` CASCADE", sql); + } + + @Test + void testGenerateCreateDatabaseSqlWithLabels() { + // Test with labels in JSON format + Map<String, String> properties = new HashMap<>(); + properties.put(BigQuerySchemaPropertiesMetadata.LOCATION, "us-central1"); + properties.put( + BigQuerySchemaPropertiesMetadata.LABELS, + "[{\"environment\":\"test\"},{\"department\":\"data-engineering\"}]"); + + String sql = + operations.generateCreateDatabaseSql("test_dataset", "Test with labels", properties); + assertTrue(sql.contains("CREATE SCHEMA `test_dataset`")); + assertTrue(sql.contains("location=\"us-central1\"")); + assertTrue( + sql.contains("labels=[(\"environment\",\"test\"),(\"department\",\"data-engineering\")]")); + assertTrue(sql.contains("description=\"Test with labels\"")); + } + + @Test + void testGenerateCreateDatabaseSqlWithKmsKey() { + // Test with KMS key + Map<String, String> properties = new HashMap<>(); + properties.put( + BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME, + "projects/my-project/locations/us-central1/keyRings/my-ring/cryptoKeys/my-key"); + + String sql = operations.generateCreateDatabaseSql("test_dataset", null, properties); + assertTrue(sql.contains("CREATE SCHEMA `test_dataset`")); + assertTrue( + sql.contains( + "default_kms_key_name=\"projects/my-project/locations/us-central1/keyRings/my-ring/cryptoKeys/my-key\"")); + } + + @Test + void testSupportSchemaComment() { + assertTrue(operations.supportSchemaComment()); + } + + @Test + void testCreateSysDatabaseNameSet() { + assertTrue(operations.createSysDatabaseNameSet().contains("information_schema")); + assertTrue(operations.createSysDatabaseNameSet().contains("INFORMATION_SCHEMA")); + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 0ac9055b79..7456bb4c2d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -134,6 +134,8 @@ ognl = "3.4.7" concurrent-trees = "2.6.0" jakarta-validation = "2.0.2" aspectj = "1.9.24" +simba-bigquery-jdbc = "1.6.3.1004" +commons-compress = "1.27.1" [libraries] aspectj-aspectjrt = { group = "org.aspectj", name = "aspectjrt", version.ref = "aspectj" } @@ -219,6 +221,7 @@ mockserver-client-java = { group = "org.mock-server", name = "mockserver-client- commons-csv = { group = "org.apache.commons", name = "commons-csv", version.ref = "commons-csv" } commons-lang = { group = "commons-lang", name = "commons-lang", version.ref = "commons-lang" } commons-lang3 = { group = "org.apache.commons", name = "commons-lang3", version.ref = "commons-lang3" } +commons-compress = { group = "org.apache.commons", name = "commons-compress", version.ref = "commons-compress" } commons-logging = { group = "commons-logging", name = "commons-logging", version.ref = "commons-logging" } commons-io = { group = "commons-io", name = "commons-io", version.ref = "commons-io" } caffeine = { group = "com.github.ben-manes.caffeine", name = "caffeine", version.ref = "caffeine" }
