This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch internal-main
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 8618ece17a9f8f55fa2191fcf19d4297a6a9e9c4
Author: geyanggang <[email protected]>
AuthorDate: Fri Feb 6 16:56:08 2026 +0800

    [#92]feat(bigquery-catalog): Add Schema operators for BigQuery catalog.
---
 catalogs/catalog-jdbc-bigquery/build.gradle.kts    | 109 ++-
 .../catalog/bigquery/BigQueryCatalog.java          | 125 +++-
 .../BigQueryCatalogPropertiesMetadata.java         |  17 +-
 .../catalog/bigquery/BigQueryClientPool.java       | 171 +++++
 .../bigquery/BigQuerySchemaPropertiesMetadata.java | 235 +++++-
 .../operation/BigQueryDatabaseOperations.java      | 784 ++++++++++++++++++++-
 .../bigquery/operation/BigQueryOperationUtils.java | 100 +++
 .../operation/BigQueryTableOperations.java         |  21 +
 .../src/main/resources/jdbc-bigquery.conf          |  12 +-
 .../catalog/bigquery/TestBigQueryCatalog.java      | 103 +++
 .../catalog/bigquery/TestBigQueryClientPool.java   |  87 +++
 .../TestBigQuerySchemaPropertiesMetadata.java      | 141 ++++
 .../operation/TestBigQueryDatabaseOperations.java  | 136 ++++
 gradle/libs.versions.toml                          |   3 +
 14 files changed, 2016 insertions(+), 28 deletions(-)

diff --git a/catalogs/catalog-jdbc-bigquery/build.gradle.kts 
b/catalogs/catalog-jdbc-bigquery/build.gradle.kts
index 7b04ae9330..efdaae56cd 100644
--- a/catalogs/catalog-jdbc-bigquery/build.gradle.kts
+++ b/catalogs/catalog-jdbc-bigquery/build.gradle.kts
@@ -22,12 +22,79 @@ plugins {
   `maven-publish`
   id("java")
   id("idea")
+  id("de.undercouch.download") version "5.4.0"
+}
+
+val simbaDriverVersion = libs.versions.simba.bigquery.jdbc.get()
+val simbaDriverUrl = 
"https://storage.googleapis.com/simba-bq-release/jdbc/SimbaJDBCDriverforGoogleBigQuery42_$simbaDriverVersion.zip";
+
+val simbaDownloadDir = layout.buildDirectory.dir("downloads").get()
+val simbaExtractDir = layout.buildDirectory.dir("extracted/simba-driver").get()
+
+val simbaZipFile = 
simbaDownloadDir.file("simba-driver-$simbaDriverVersion.zip").asFile
+
+val downloadSimbaDriver by 
tasks.registering(de.undercouch.gradle.tasks.download.Download::class) {
+  src(simbaDriverUrl)
+  dest(simbaZipFile)
+  overwrite(false)
+  onlyIfModified(true)
+
+  doFirst {
+    logger.lifecycle("Downloading Simba BigQuery JDBC driver: 
v$simbaDriverVersion...")
+  }
+
+  doLast {
+    val fileSize = simbaZipFile.length() / 1024 / 1024
+    logger.lifecycle("Download completed. File size: ${fileSize}MB")
+  }
+}
+
+val extractSimbaDriver by tasks.registering(Copy::class) {
+  dependsOn(downloadSimbaDriver)
+
+  from(zipTree(simbaZipFile))
+  into(simbaExtractDir)
+  include("**/*.jar")
+  exclude("**/src/", "**/doc/", "**/samples/", "**/legal/")
+
+  rename("GoogleBigQueryJDBC42.jar", "GoogleBigQueryJDBC42-simba.jar")
+
+  doFirst {
+    logger.lifecycle("Decompressing Simba JDBC driver...")
+  }
+
+  doLast {
+    val jarFiles = fileTree(simbaExtractDir).matching { include("*.jar") 
}.files
+    logger.lifecycle("Found ${jarFiles.size} JAR files:")
+    jarFiles.forEach { jar ->
+      logger.lifecycle("  - ${jar.name} (${jar.length() / 1024}KB)")
+    }
+  }
+}
+
+val cleanSimbaDriver by tasks.registering(Delete::class) {
+  delete(simbaDownloadDir, simbaExtractDir)
+}
+
+tasks.clean {
+  dependsOn(cleanSimbaDriver)
+}
+
+tasks.withType<JavaCompile> {
+  dependsOn(extractSimbaDriver)
+}
+
+tasks.test {
+  dependsOn(extractSimbaDriver)
 }
 
 dependencies {
   implementation(project(":api")) {
     exclude(group = "*")
   }
+  implementation(project(":catalogs:catalog-common")) {
+    exclude(group = "*")
+  }
   implementation(project(":catalogs:catalog-jdbc-common")) {
     exclude(group = "*")
   }
@@ -39,8 +106,10 @@ dependencies {
   }
 
   implementation(libs.bundles.log4j)
+  implementation(libs.commons.collections4)
   implementation(libs.commons.lang3)
   implementation(libs.guava)
+  implementation(libs.commons.compress)
 
   testImplementation(project(":catalogs:catalog-jdbc-common", "testArtifacts"))
   testImplementation(project(":clients:client-java"))
@@ -50,34 +119,64 @@ dependencies {
 
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
+  testImplementation(libs.testcontainers)
+  testImplementation(libs.mockito.core)
+
+  val simbaJdbcDriver = files(simbaExtractDir.asFileTree.matching { 
include("*.jar") })
+  implementation(simbaJdbcDriver)
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
 
 tasks {
   val runtimeJars by registering(Copy::class) {
+    dependsOn("jar", extractSimbaDriver)
     from(configurations.runtimeClasspath)
     into("build/libs")
+    duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+
+    doFirst {
+      logger.lifecycle("Copying runtime dependencies to build/libs")
+      logger.lifecycle("Including Simba JDBC driver files:")
+      fileTree(simbaExtractDir).matching { include("*.jar") }.forEach { jar ->
+        logger.lifecycle("  - ${jar.name}")
+      }
+    }
   }
 
   val copyCatalogLibs by registering(Copy::class) {
-    dependsOn("jar", "runtimeJars")
+    dependsOn("jar", runtimeJars)
     from("build/libs") {
       exclude("guava-*.jar")
       exclude("log4j-*.jar")
       exclude("slf4j-*.jar")
+      exclude("grpc-google-cloud-bigquerystorage-v1beta*.jar")
+      exclude("proto-google-cloud-bigquerystorage-v1beta*.jar")
+      exclude("error_prone_annotations-2.33.0.jar")
+      exclude("failureaccess-1.0.2.jar")
     }
     into("$rootDir/distribution/package/catalogs/jdbc-bigquery/libs")
+
+    doLast {
+      logger.lifecycle("The BigQuery JDBC Catalog library file has been copied 
to the distribution directory.")
+    }
   }
 
   val copyCatalogConfig by registering(Copy::class) {
     from("src/main/resources")
     into("$rootDir/distribution/package/catalogs/jdbc-bigquery/conf")
+
     include("jdbc-bigquery.conf")
+
     exclude { details ->
       details.file.isDirectory()
     }
+
     fileMode = 0b111101101
+
+    doLast {
+      logger.lifecycle("BigQuery JDBC configuration file copied.")
+    }
   }
 
   register("copyLibAndConfig", Copy::class) {
@@ -93,8 +192,12 @@ tasks.test {
   } else {
     dependsOn(tasks.jar)
   }
+  dependsOn(extractSimbaDriver)
+  environment("SIMBA_JDBC_DRIVER_PATH", simbaExtractDir.asFile.absolutePath)
 }
 
-tasks.getByName("generateMetadataFileForMavenJavaPublication") {
-  dependsOn("runtimeJars")
+afterEvaluate {
+  tasks.getByName("generateMetadataFileForMavenJavaPublication") {
+    dependsOn(tasks.getByName("runtimeJars"))
+  }
 }
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalog.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalog.java
index bb47ddc209..85dee82a15 100644
--- 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalog.java
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalog.java
@@ -18,12 +18,16 @@
  */
 package org.apache.gravitino.catalog.bigquery;
 
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.gravitino.catalog.bigquery.converter.BigQueryColumnDefaultValueConverter;
 import 
org.apache.gravitino.catalog.bigquery.converter.BigQueryExceptionConverter;
 import org.apache.gravitino.catalog.bigquery.converter.BigQueryTypeConverter;
 import 
org.apache.gravitino.catalog.bigquery.operation.BigQueryDatabaseOperations;
 import org.apache.gravitino.catalog.bigquery.operation.BigQueryTableOperations;
 import org.apache.gravitino.catalog.jdbc.JdbcCatalog;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
 import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
@@ -31,18 +35,131 @@ import 
org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
 import org.apache.gravitino.connector.PropertiesMetadata;
 import org.apache.gravitino.connector.capability.Capability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Implementation of a BigQuery catalog in Apache Gravitino. */
 public class BigQueryCatalog extends JdbcCatalog {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryCatalog.class);
+
   public static final BigQueryTablePropertiesMetadata 
BIGQUERY_TABLE_PROPERTIES_META =
       new BigQueryTablePropertiesMetadata();
 
+  private BigQueryClientPool clientPool;
+
+  @Override
+  public BigQueryCatalog withCatalogConf(Map<String, String> conf) {
+    // Build JDBC URL from friendly properties before setting config
+    Map<String, String> processedConfig = buildJdbcUrl(conf);
+
+    // Initialize BigQuery API client pool for cross-region dataset listing
+    this.clientPool = new BigQueryClientPool(processedConfig);
+
+    // Call parent with processed config
+    return (BigQueryCatalog) super.withCatalogConf(processedConfig);
+  }
+
+  /**
+   * Gets the BigQuery API client pool.
+   *
+   * <p>This method provides access to the BigQuery API client pool for 
operations that need to
+   * interact with BigQuery APIs directly, such as cross-region dataset 
operations or advanced
+   * metadata retrieval that cannot be performed through JDBC.
+   *
+   * @return BigQuery client pool, may be null if not initialized
+   */
+  public BigQueryClientPool getClientPool() {
+    return clientPool;
+  }
+
+  @Override
+  public void close() throws java.io.IOException {
+    if (clientPool != null) {
+      try {
+        clientPool.close();
+      } catch (Exception e) {
+        LOG.warn("Error closing BigQuery client pool", e);
+      }
+    }
+    super.close();
+  }
+
   @Override
   public String shortName() {
     return "jdbc-bigquery";
   }
 
+  /**
+   * Build JDBC URL from individual BigQuery configuration components. If 
jdbc-url contains
+   * authentication parameters, use it directly. Otherwise, enhance the base 
URL with project-id,
+   * jdbc-user (service account email), and jdbc-password (key file path).
+   */
+  private Map<String, String> buildJdbcUrl(Map<String, String> config) {
+
+    String jdbcUrl = config.get(JdbcConfig.JDBC_URL.getKey());
+
+    // If jdbc-url already contains authentication parameters, use it as-is
+    if (StringUtils.isNotBlank(jdbcUrl)
+        && jdbcUrl.contains("ProjectId=")
+        && (jdbcUrl.contains("OAuthServiceAcctEmail=") || 
jdbcUrl.contains("OAuthType="))) {
+      LOG.info("Using existing JDBC URL with authentication parameters");
+      return config;
+    }
+
+    // Extract BigQuery-specific properties
+    String projectId = 
config.get(BigQueryCatalogPropertiesMetadata.PROJECT_ID);
+    String keyFilePath = config.get(JdbcConfig.PASSWORD.getKey()); // Key file 
path
+    String serviceAccountEmail = config.get(JdbcConfig.USERNAME.getKey()); // 
Service account email
+
+    // Validate required properties
+    if (StringUtils.isBlank(projectId)) {
+      throw new IllegalArgumentException("project-id is required for BigQuery 
catalog");
+    }
+    if (StringUtils.isBlank(keyFilePath)) {
+      throw new IllegalArgumentException(
+          "jdbc-password (key file path) is required for BigQuery catalog");
+    }
+
+    // Build complete JDBC URL with authentication parameters
+    // Use OAuthType=0 for service account authentication
+    StringBuilder urlBuilder = new StringBuilder();
+
+    // Start with base URL or construct default URL
+    if (StringUtils.isNotBlank(jdbcUrl)) {
+      // Ensure URL ends with semicolon for parameter separation
+      String baseUrl = jdbcUrl.endsWith(";") ? jdbcUrl : jdbcUrl + ";";
+      urlBuilder.append(baseUrl);
+    } else {
+      // Use default BigQuery JDBC URL
+      
urlBuilder.append("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;";);
+    }
+
+    urlBuilder.append("ProjectId=").append(projectId).append(";");
+    urlBuilder.append("OAuthType=0;");
+
+    // Add service account email if provided
+    if (StringUtils.isNotBlank(serviceAccountEmail)) {
+      
urlBuilder.append("OAuthServiceAcctEmail=").append(serviceAccountEmail).append(";");
+    }
+
+    urlBuilder.append("OAuthPvtKeyPath=").append(keyFilePath).append(";");
+
+    String completeJdbcUrl = urlBuilder.toString();
+
+    // Create new config map with complete jdbc-url
+    HashMap<String, String> newConfig = new HashMap<>(config);
+    newConfig.put(JdbcConfig.JDBC_URL.getKey(), completeJdbcUrl);
+
+    // Log the constructed URL (without sensitive info)
+    LOG.info("Built BigQuery JDBC URL for project: {}", projectId);
+    LOG.debug(
+        "Complete JDBC URL: {}",
+        completeJdbcUrl.replaceAll("OAuthPvtKeyPath=[^;]+", 
"OAuthPvtKeyPath=[REDACTED]"));
+
+    return newConfig;
+  }
+
   @Override
   public Capability newCapability() {
     return new BigQueryCatalogCapability();
@@ -75,12 +192,16 @@ public class BigQueryCatalog extends JdbcCatalog {
 
   @Override
   protected JdbcDatabaseOperations createJdbcDatabaseOperations() {
-    return new BigQueryDatabaseOperations();
+    BigQueryDatabaseOperations operations = new BigQueryDatabaseOperations();
+    operations.setClientPool(this.clientPool);
+    return operations;
   }
 
   @Override
   protected JdbcTableOperations createJdbcTableOperations() {
-    return new BigQueryTableOperations();
+    BigQueryTableOperations operations = new BigQueryTableOperations();
+    operations.setClientPool(this.clientPool);
+    return operations;
   }
 
   @Override
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalogPropertiesMetadata.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalogPropertiesMetadata.java
index c1df1b8b42..8d356159be 100644
--- 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryCatalogPropertiesMetadata.java
@@ -23,18 +23,23 @@ import java.util.Map;
 import org.apache.gravitino.catalog.jdbc.JdbcCatalogPropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
-/** Properties metadata for BigQuery catalog. */
+/**
+ * Properties metadata for BigQuery catalog.
+ *
+ * <p>Defines BigQuery-specific catalog properties including project ID and 
authentication
+ * credentials.
+ */
 public class BigQueryCatalogPropertiesMetadata extends 
JdbcCatalogPropertiesMetadata {
 
+  // BigQuery catalog property keys
+  public static final String PROJECT_ID = "project-id";
+
   private static final Map<String, PropertyEntry<?>> 
BIGQUERY_CATALOG_PROPERTY_ENTRIES =
       ImmutableMap.<String, PropertyEntry<?>>builder()
           .put(
-              "project-id",
+              PROJECT_ID,
               PropertyEntry.stringRequiredPropertyEntry(
-                  "project-id",
-                  "Google Cloud Project ID",
-                  false /* immutable */,
-                  false /* hidden */))
+                  PROJECT_ID, "Google Cloud Project ID", false /* immutable 
*/, false /* hidden */))
           .build();
 
   @Override
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryClientPool.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryClientPool.java
new file mode 100644
index 0000000000..05683bfb5e
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryClientPool.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery;
+
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.gson.GsonFactory;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BigQuery client pool for managing BigQuery API clients.
+ *
+ * <p>This class provides a centralized way to create and manage BigQuery API 
clients with proper
+ * authentication using service account credentials. It uses the Google API 
Client Library for Java
+ * (google-api-services-bigquery) which is already included in the Simba JDBC 
driver dependencies.
+ */
+public class BigQueryClientPool {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryClientPool.class);
+  private static final JsonFactory JSON_FACTORY = 
GsonFactory.getDefaultInstance();
+  private static final String APPLICATION_NAME = 
"Apache-Gravitino-BigQuery-Catalog";
+
+  private final String projectId;
+  private final String keyFilePath;
+  private volatile Bigquery bigQueryClient;
+
+  /**
+   * Creates a new BigQuery client pool.
+   *
+   * @param config catalog configuration containing project-id and 
jdbc-password (key file path)
+   */
+  public BigQueryClientPool(Map<String, String> config) {
+    this.projectId = config.get(BigQueryCatalogPropertiesMetadata.PROJECT_ID);
+    this.keyFilePath = config.get(JdbcConfig.PASSWORD.getKey()); // Key file 
path
+
+    if (StringUtils.isBlank(projectId)) {
+      throw new IllegalArgumentException("project-id is required for BigQuery 
catalog");
+    }
+    if (StringUtils.isBlank(keyFilePath)) {
+      throw new IllegalArgumentException(
+          "jdbc-password (key file path) is required for BigQuery catalog");
+    }
+
+    LOG.info("Initialized BigQuery client pool for project: {}", projectId);
+  }
+
+  /**
+   * Gets or creates a BigQuery API client.
+   *
+   * <p>This method uses double-checked locking to ensure thread-safe lazy 
initialization of the
+   * BigQuery client.
+   *
+   * @return BigQuery API client
+   * @throws RuntimeException if client creation fails
+   */
+  public Bigquery getClient() {
+    if (bigQueryClient == null) {
+      synchronized (this) {
+        if (bigQueryClient == null) {
+          bigQueryClient = createClient();
+        }
+      }
+    }
+    return bigQueryClient;
+  }
+
+  /**
+   * Creates a new BigQuery API client with service account authentication.
+   *
+   * @return BigQuery API client
+   * @throws RuntimeException if client creation fails
+   */
+  private Bigquery createClient() {
+    try {
+      LOG.debug("Creating BigQuery API client for project: {}", projectId);
+
+      // Load service account credentials from key file
+      GoogleCredentials credentials;
+      try (FileInputStream serviceAccountStream = new 
FileInputStream(keyFilePath)) {
+        credentials = 
ServiceAccountCredentials.fromStream(serviceAccountStream);
+      }
+
+      // Create HTTP transport
+      HttpTransport httpTransport = 
GoogleNetHttpTransport.newTrustedTransport();
+
+      // Build BigQuery client with credentials
+      Bigquery client =
+          new Bigquery.Builder(httpTransport, JSON_FACTORY, new 
HttpCredentialsAdapter(credentials))
+              .setApplicationName(APPLICATION_NAME)
+              .build();
+
+      LOG.info("Successfully created BigQuery API client for project: {}", 
projectId);
+      return client;
+
+    } catch (IOException e) {
+      String errorMsg =
+          String.format(
+              "Failed to create BigQuery API client for project '%s'. "
+                  + "Please verify the service account key file path: %s",
+              projectId, keyFilePath);
+      LOG.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    } catch (GeneralSecurityException e) {
+      String errorMsg =
+          String.format("Security error creating BigQuery API client for 
project '%s'", projectId);
+      LOG.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    } catch (Exception e) {
+      String errorMsg =
+          String.format(
+              "Unexpected error creating BigQuery API client for project 
'%s'", projectId);
+      LOG.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+  }
+
+  /**
+   * Closes the BigQuery client and releases resources.
+   *
+   * <p>This method should be called when the catalog is being shut down.
+   */
+  public void close() {
+    if (bigQueryClient != null) {
+      try {
+        // BigQuery client doesn't have an explicit close method
+        // Resources are managed by the underlying HTTP transport
+        bigQueryClient = null;
+        LOG.info("Closed BigQuery API client for project: {}", projectId);
+      } catch (Exception e) {
+        LOG.warn("Error closing BigQuery API client", e);
+      }
+    }
+  }
+
+  /**
+   * Gets the project ID.
+   *
+   * @return project ID
+   */
+  public String getProjectId() {
+    return projectId;
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQuerySchemaPropertiesMetadata.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQuerySchemaPropertiesMetadata.java
index 291c195d79..cb1b88006d 100644
--- 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQuerySchemaPropertiesMetadata.java
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQuerySchemaPropertiesMetadata.java
@@ -18,16 +18,245 @@
  */
 package org.apache.gravitino.catalog.bigquery;
 
-import com.google.common.collect.ImmutableMap;
+import static 
org.apache.gravitino.connector.PropertyEntry.stringImmutablePropertyEntry;
+import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
-/** BigQuery schema (dataset) properties metadata. */
+/**
+ * BigQuery schema (dataset) properties metadata.
+ *
+ * <p>Defines BigQuery-specific dataset properties such as location, default 
table expiration, case
+ * sensitivity, and labels. Some properties are immutable after dataset 
creation (location,
+ * default_collation) while others can be modified 
(default_table_expiration_days,
+ * is_case_insensitive, description, labels).
+ */
 public class BigQuerySchemaPropertiesMetadata extends BasePropertiesMetadata {
 
+  // BigQuery Dataset specific property keys - Supported properties
+  public static final String LOCATION = "location";
+  public static final String DEFAULT_TABLE_EXPIRATION_DAYS = 
"default_table_expiration_days";
+  public static final String IS_CASE_INSENSITIVE = "is_case_insensitive";
+  public static final String DESCRIPTION = "description";
+  public static final String LABELS = "labels";
+  public static final String DEFAULT_COLLATION = "default_collation";
+  public static final String DEFAULT_PARTITION_EXPIRATION_DAYS =
+      "default_partition_expiration_days";
+  public static final String STORAGE_BILLING_MODEL = "storage_billing_model";
+  public static final String MAX_TIME_TRAVEL_HOURS = "max_time_travel_hours";
+  public static final String DEFAULT_KMS_KEY_NAME = "default_kms_key_name";
+  public static final String DEFAULT_ROUNDING_MODE = "default_rounding_mode";
+  public static final String FRIENDLY_NAME = "friendly_name";
+  public static final String FAILOVER_RESERVATION = "failover_reservation";
+  public static final String IS_PRIMARY = "is_primary";
+  public static final String PRIMARY_REPLICA = "primary_replica";
+  public static final String TAGS = "tags";
+
+  private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA =
+      createPropertiesMetadata();
+
+  /**
+   * Creates the properties metadata map for BigQuery datasets.
+   *
+   * @return immutable map of property entries
+   */
+  private static Map<String, PropertyEntry<?>> createPropertiesMetadata() {
+    Map<String, PropertyEntry<?>> map = new HashMap<>();
+
+    // Dataset location (immutable after creation)
+    map.put(
+        LOCATION,
+        stringImmutablePropertyEntry(
+            LOCATION,
+            "Dataset location (e.g., 'us', 'eu', 'asia-northeast1'). "
+                + "Cannot be changed after dataset creation.",
+            false,
+            null,
+            false,
+            false));
+
+    // Default table expiration days (mutable)
+    map.put(
+        DEFAULT_TABLE_EXPIRATION_DAYS,
+        stringOptionalPropertyEntry(
+            DEFAULT_TABLE_EXPIRATION_DAYS,
+            "Default expiration time for tables in days. "
+                + "Tables will be automatically deleted after this period.",
+            false,
+            null,
+            false));
+
+    // Default partition expiration days (mutable)
+    map.put(
+        DEFAULT_PARTITION_EXPIRATION_DAYS,
+        stringOptionalPropertyEntry(
+            DEFAULT_PARTITION_EXPIRATION_DAYS,
+            "Default expiration time for table partitions in days. "
+                + "Partitions will be automatically deleted after this 
period.",
+            false,
+            null,
+            false));
+
+    // Case insensitive setting (mutable)
+    map.put(
+        IS_CASE_INSENSITIVE,
+        stringOptionalPropertyEntry(
+            IS_CASE_INSENSITIVE,
+            "Whether the dataset is case insensitive for table and column 
names (true/false)",
+            false,
+            null,
+            false));
+
+    // Dataset description (mutable)
+    map.put(
+        DESCRIPTION,
+        stringOptionalPropertyEntry(
+            DESCRIPTION, "Dataset description for documentation purposes", 
false, null, false));
+
+    // Labels in JSON format (mutable)
+    map.put(
+        LABELS,
+        stringOptionalPropertyEntry(
+            LABELS,
+            "Dataset labels in JSON array format: [{\"key\":\"value\"}]. "
+                + "Used for organizing and filtering datasets.",
+            false,
+            null,
+            false));
+
+    // Default collation (immutable after creation)
+    map.put(
+        DEFAULT_COLLATION,
+        stringImmutablePropertyEntry(
+            DEFAULT_COLLATION,
+            "Default collation for STRING columns (e.g., 'und:ci' for 
case-insensitive). "
+                + "Cannot be changed after dataset creation.",
+            false,
+            null,
+            false,
+            false));
+
+    // Storage billing model (mutable, with restrictions)
+    map.put(
+        STORAGE_BILLING_MODEL,
+        stringOptionalPropertyEntry(
+            STORAGE_BILLING_MODEL,
+            "Storage billing model: 'LOGICAL' (default) or 'PHYSICAL'. "
+                + "Once changed, must wait 14 days before changing again.",
+            false,
+            null,
+            false));
+
+    // Max time travel hours (mutable)
+    map.put(
+        MAX_TIME_TRAVEL_HOURS,
+        stringOptionalPropertyEntry(
+            MAX_TIME_TRAVEL_HOURS,
+            "Time travel window in hours. Must be multiple of 24, between 
48-168 hours. Default: 168",
+            false,
+            null,
+            false));
+
+    // Default KMS key name (mutable)
+    map.put(
+        DEFAULT_KMS_KEY_NAME,
+        stringOptionalPropertyEntry(
+            DEFAULT_KMS_KEY_NAME,
+            "Default Cloud KMS key for encrypting table data in this dataset. "
+                + "Format: 
projects/PROJECT/locations/LOCATION/keyRings/RING/cryptoKeys/KEY",
+            false,
+            null,
+            false));
+
+    // Default rounding mode (mutable)
+    map.put(
+        DEFAULT_ROUNDING_MODE,
+        stringOptionalPropertyEntry(
+            DEFAULT_ROUNDING_MODE,
+            "Default rounding mode for numeric operations: 'ROUND_HALF_EVEN' 
or 'ROUND_HALF_AWAY_FROM_ZERO'",
+            false,
+            null,
+            false));
+
+    // Friendly name (mutable)
+    map.put(
+        FRIENDLY_NAME,
+        stringOptionalPropertyEntry(
+            FRIENDLY_NAME, "A descriptive name for the dataset", false, null, 
false));
+
+    // Failover reservation (mutable)
+    map.put(
+        FAILOVER_RESERVATION,
+        stringOptionalPropertyEntry(
+            FAILOVER_RESERVATION,
+            "Associates the dataset to a reservation in case of failover 
scenario",
+            false,
+            null,
+            false));
+
+    // Is primary replica (mutable, for cross-region replication)
+    map.put(
+        IS_PRIMARY,
+        stringOptionalPropertyEntry(
+            IS_PRIMARY,
+            "Declares if the dataset is the primary replica (true/false). "
+                + "Only applicable for cross-region replicated datasets.",
+            false,
+            null,
+            false));
+
+    // Primary replica name (mutable, for cross-region replication)
+    map.put(
+        PRIMARY_REPLICA,
+        stringOptionalPropertyEntry(
+            PRIMARY_REPLICA,
+            "The replica name to set as the primary replica. "
+                + "Only applicable for cross-region replicated datasets.",
+            false,
+            null,
+            false));
+
+    // IAM tags (mutable)
+    map.put(
+        TAGS,
+        stringOptionalPropertyEntry(
+            TAGS,
+            "IAM tags in JSON array format: [{\"key\":\"value\"}]. "
+                + "Requires IAM tag functionality to be enabled.",
+            false,
+            null,
+            false));
+
+    return Collections.unmodifiableMap(map);
+  }
+
   @Override
   protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
-    return ImmutableMap.of();
+    return PROPERTIES_METADATA;
+  }
+
+  /**
+   * Checks if a property is supported by Gravitino BigQuery catalog.
+   *
+   * @param propertyName the property name to check
+   * @return true if supported, false otherwise
+   */
+  public static boolean isSupportedProperty(String propertyName) {
+    return PROPERTIES_METADATA.containsKey(propertyName);
+  }
+
+  /**
+   * Checks if a property is mutable (can be changed after dataset creation).
+   *
+   * @param propertyName the property name to check
+   * @return true if the property can be modified after creation
+   */
+  public static boolean isMutableProperty(String propertyName) {
+    PropertyEntry<?> entry = PROPERTIES_METADATA.get(propertyName);
+    return entry != null && !entry.isImmutable();
   }
 }
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryDatabaseOperations.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryDatabaseOperations.java
index 96464acdcf..5df12adec7 100644
--- 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryDatabaseOperations.java
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryDatabaseOperations.java
@@ -18,40 +18,808 @@
  */
 package org.apache.gravitino.catalog.bigquery.operation;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetList;
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.catalog.bigquery.BigQueryClientPool;
+import org.apache.gravitino.catalog.bigquery.BigQuerySchemaPropertiesMetadata;
 import org.apache.gravitino.catalog.jdbc.JdbcSchema;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
+import org.apache.gravitino.catalog.jdbc.utils.JdbcConnectorUtils;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.meta.AuditInfo;
 
 /** Database operations for BigQuery. In BigQuery, databases are called 
datasets. */
 public class BigQueryDatabaseOperations extends JdbcDatabaseOperations {
 
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private BigQueryClientPool clientPool;
+
+  /**
+   * Sets the BigQuery API client pool.
+   *
+   * @param clientPool BigQuery client pool
+   */
+  public void setClientPool(BigQueryClientPool clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  /**
+   * Lists all datasets (databases) in the BigQuery project using BigQuery API.
+   *
+   * <p>This method uses the BigQuery API instead of INFORMATION_SCHEMA to 
avoid regional
+   * limitations. INFORMATION_SCHEMA is region-specific and cannot list 
datasets across all regions
+   * in a single query.
+   *
+   * @return list of dataset names
+   */
   @Override
-  public String generateCreateDatabaseSql(
-      String databaseName, String comment, Map<String, String> properties) {
-    throw new NotImplementedException("To be implemented in the future");
+  public List<String> listDatabases() {
+    List<String> databaseNames = new ArrayList<>();
+
+    // Use BigQuery API - required for cross-region dataset listing
+    BigQueryOperationUtils.BigQueryContext context =
+        BigQueryOperationUtils.getBigQueryContext(clientPool, "list datasets");
+
+    try {
+      LOG.debug("Listing BigQuery datasets using API for project: {}", 
context.getProjectId());
+
+      // List all datasets in the project using BigQuery API
+      Bigquery.Datasets.List request =
+          context.getBigquery().datasets().list(context.getProjectId());
+      request.setAll(false); // Only list visible datasets (not hidden or 
temporary)
+
+      DatasetList response = request.execute();
+
+      if (response.getDatasets() != null) {
+        for (DatasetList.Datasets dataset : response.getDatasets()) {
+          String datasetId = dataset.getDatasetReference().getDatasetId();
+          if (!isSystemDatabase(datasetId)) {
+            databaseNames.add(datasetId);
+            LOG.debug("Found BigQuery dataset: {}", datasetId);
+          }
+        }
+      }
+
+      LOG.info(
+          "Listed {} BigQuery datasets using API for project: {}",
+          databaseNames.size(),
+          context.getProjectId());
+      return databaseNames;
+
+    } catch (IOException e) {
+      LOG.error("Failed to list BigQuery datasets using API", e);
+      throw new RuntimeException("Failed to list BigQuery datasets using API", 
e);
+    }
   }
 
+  /**
+   * Checks if a dataset exists using BigQuery API.
+   *
+   * <p>This method uses the BigQuery API instead of JDBC to avoid regional 
limitations. JDBC
+   * connections cannot access datasets in different regions.
+   *
+   * @param databaseName the dataset name to check
+   * @return true if the dataset exists, false otherwise
+   */
   @Override
-  public String generateDropDatabaseSql(String databaseName, boolean cascade) {
-    throw new NotImplementedException("To be implemented in the future");
+  public boolean exist(String databaseName) {
+    // Use BigQuery API - required for cross-region dataset access
+    BigQueryOperationUtils.BigQueryContext context =
+        BigQueryOperationUtils.getBigQueryContext(clientPool, "check dataset 
existence");
+
+    try {
+      LOG.debug("Checking dataset existence using API: {}", databaseName);
+
+      // Try to get the dataset using BigQuery API
+      context.getBigquery().datasets().get(context.getProjectId(), 
databaseName).execute();
+
+      LOG.debug("Dataset exists: {}", databaseName);
+      return true;
+
+    } catch (IOException e) {
+      // 404 means dataset doesn't exist
+      if (e.getMessage() != null && e.getMessage().contains("404")) {
+        LOG.debug("Dataset does not exist: {}", databaseName);
+        return false;
+      }
+      // Other errors should be thrown
+      LOG.error("Failed to check dataset existence using API: {}", 
databaseName, e);
+      throw new RuntimeException(
+          "Failed to check dataset existence using BigQuery API: " + 
databaseName, e);
+    }
   }
 
+  /**
+   * Loads a dataset's metadata using BigQuery API.
+   *
+   * <p>This method uses the BigQuery API to retrieve dataset information 
including all available
+   * properties. All property values are returned as strings for consistency.
+   *
+   * @param databaseName the dataset name to load
+   * @return JdbcSchema object containing dataset metadata
+   * @throws NoSuchSchemaException if the dataset does not exist
+   */
   @Override
   public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
-    throw new NotImplementedException("To be implemented in the future");
+    // Use BigQuery API - required for cross-region dataset access and 
complete metadata
+    BigQueryOperationUtils.BigQueryContext context =
+        BigQueryOperationUtils.getBigQueryContext(clientPool, "load dataset");
+
+    try {
+      LOG.debug("Loading dataset using API: {}", databaseName);
+
+      // Get dataset details using BigQuery API
+      Dataset dataset =
+          context.getBigquery().datasets().get(context.getProjectId(), 
databaseName).execute();
+
+      // Extract all available properties from dataset
+      Map<String, String> properties = extractDatasetProperties(dataset);
+
+      LOG.info(
+          "Loaded dataset using API: {} in location: {} with {} properties",
+          databaseName,
+          dataset.getLocation(),
+          properties.size());
+
+      return JdbcSchema.builder()
+          .withName(databaseName)
+          .withComment(dataset.getDescription())
+          .withProperties(properties)
+          .withAuditInfo(AuditInfo.EMPTY)
+          .build();
+
+    } catch (IOException e) {
+      // 404 means dataset doesn't exist
+      if (e.getMessage() != null && e.getMessage().contains("404")) {
+        LOG.warn("Dataset not found: {}", databaseName);
+        throw new NoSuchSchemaException("Database %s could not be found", 
databaseName);
+      }
+      // Other errors should be thrown
+      LOG.error("Failed to load dataset using API: {}", databaseName, e);
+      throw new RuntimeException("Failed to load dataset using BigQuery API: " 
+ databaseName, e);
+    }
+  }
+
+  /**
+   * Extracts all available properties from a BigQuery dataset and converts 
them to string values.
+   *
+   * @param dataset the BigQuery dataset object
+   * @return map of property names to string values
+   */
+  private Map<String, String> extractDatasetProperties(Dataset dataset) {
+    Map<String, String> properties = new HashMap<>();
+
+    // Location
+    if (dataset.getLocation() != null) {
+      properties.put(BigQuerySchemaPropertiesMetadata.LOCATION, 
dataset.getLocation());
+    }
+
+    // Description
+    if (dataset.getDescription() != null) {
+      properties.put(BigQuerySchemaPropertiesMetadata.DESCRIPTION, 
dataset.getDescription());
+    }
+
+    // Friendly name
+    if (dataset.getFriendlyName() != null) {
+      properties.put(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME, 
dataset.getFriendlyName());
+    }
+
+    // Default table expiration (convert from milliseconds to days)
+    if (dataset.getDefaultTableExpirationMs() != null) {
+      double expirationDays = dataset.getDefaultTableExpirationMs() / (1000.0 
* 60 * 60 * 24);
+      properties.put(
+          BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS,
+          String.valueOf(expirationDays));
+    }
+
+    // Default partition expiration (convert from milliseconds to days)
+    if (dataset.getDefaultPartitionExpirationMs() != null) {
+      double expirationDays = dataset.getDefaultPartitionExpirationMs() / 
(1000.0 * 60 * 60 * 24);
+      properties.put(
+          BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS,
+          String.valueOf(expirationDays));
+    }
+
+    // Storage billing model
+    if (dataset.getStorageBillingModel() != null) {
+      properties.put(
+          BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL, 
dataset.getStorageBillingModel());
+    }
+
+    // Max time travel hours (convert from milliseconds to hours)
+    if (dataset.getMaxTimeTravelHours() != null) {
+      properties.put(
+          BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS,
+          String.valueOf(dataset.getMaxTimeTravelHours()));
+    }
+
+    // Default KMS key name
+    if (dataset.getDefaultEncryptionConfiguration() != null
+        && dataset.getDefaultEncryptionConfiguration().getKmsKeyName() != 
null) {
+      properties.put(
+          BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME,
+          dataset.getDefaultEncryptionConfiguration().getKmsKeyName());
+    }
+
+    // Default rounding mode
+    if (dataset.getDefaultRoundingMode() != null) {
+      properties.put(
+          BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE, 
dataset.getDefaultRoundingMode());
+    }
+
+    // Case insensitive setting
+    if (dataset.getIsCaseInsensitive() != null) {
+      properties.put(
+          BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE,
+          String.valueOf(dataset.getIsCaseInsensitive()));
+    }
+
+    // Labels (convert to JSON-like string representation)
+    if (dataset.getLabels() != null && !dataset.getLabels().isEmpty()) {
+      StringBuilder labelsJson = new StringBuilder("[");
+      boolean first = true;
+      for (Map.Entry<String, String> label : dataset.getLabels().entrySet()) {
+        if (!first) {
+          labelsJson.append(", ");
+        }
+        first = false;
+        labelsJson
+            .append("{\"")
+            .append(BigQueryOperationUtils.escapeString(label.getKey()))
+            .append("\":\"")
+            .append(BigQueryOperationUtils.escapeString(label.getValue()))
+            .append("\"}");
+      }
+      labelsJson.append("]");
+      properties.put(BigQuerySchemaPropertiesMetadata.LABELS, 
labelsJson.toString());
+    }
+
+    // Cross-region replication properties (if applicable)
+    // Note: These properties may not be available in all BigQuery API versions
+    // They are typically managed through separate replication APIs
+
+    // Default collation (if available)
+    // Note: This property may not be directly available through the Dataset 
API
+    // It's typically set during dataset creation and may require separate 
queries
+
+    LOG.debug("Extracted {} properties from dataset: {}", properties.size(), 
properties.keySet());
+    return properties;
+  }
+
+  @Override
+  protected String generateDatabaseExistSql(String databaseName) {
+    // In BigQuery, we need to query INFORMATION_SCHEMA.SCHEMATA without 
setting schema context
+    // to avoid the "my_dataset.information_schema" issue
+    // Note: This method is not used in practice as we override exist() to use 
BigQuery API
+    // However, we escape wildcards to prevent SQL injection if this method is 
ever called
+    String escapedName = databaseName.replace("\\", "\\\\").replace("_", 
"\\_").replace("%", "\\%");
+    return String.format(
+        "SELECT schema_name FROM INFORMATION_SCHEMA.SCHEMATA WHERE schema_name 
= '%s'",
+        escapedName);
+  }
+
+  @Override
+  public String generateDropDatabaseSql(String databaseName, boolean cascade) {
+    // BigQuery uses DROP SCHEMA syntax, not DROP DATABASE
+    if (cascade) {
+      return String.format("DROP SCHEMA `%s` CASCADE", databaseName);
+    } else {
+      return String.format("DROP SCHEMA `%s` RESTRICT", databaseName);
+    }
   }
 
   @Override
   protected boolean supportSchemaComment() {
-    return true;
+    return true; // BigQuery supports dataset descriptions via OPTIONS
   }
 
   @Override
   protected Set<String> createSysDatabaseNameSet() {
     return ImmutableSet.of("information_schema", "INFORMATION_SCHEMA");
   }
+
+  /**
+   * Generates CREATE SCHEMA (Dataset) SQL for BigQuery.
+   *
+   * <p>BigQuery syntax: CREATE SCHEMA [IF NOT EXISTS] dataset_name [DEFAULT 
COLLATE
+   * collate_specification] [OPTIONS(schema_option_list)]
+   *
+   * @param databaseName the dataset name
+   * @param comment the dataset description
+   * @param properties BigQuery-specific dataset properties
+   * @return the CREATE SCHEMA SQL statement
+   */
+  @Override
+  public String generateCreateDatabaseSql(
+      String databaseName, String comment, Map<String, String> properties) {
+
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("CREATE SCHEMA ");
+
+    // Dataset name with backticks for safety
+    sqlBuilder.append("`").append(databaseName).append("`");
+
+    // Default collation (immutable, only at creation)
+    if (properties != null
+        && 
properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_COLLATION)) {
+      String collation = 
properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_COLLATION);
+      if (StringUtils.isNotEmpty(collation)) {
+        sqlBuilder.append("\nDEFAULT COLLATE '").append(collation).append("'");
+      }
+    }
+
+    // OPTIONS clause
+    String optionsClause = generateSchemaOptionsClause(comment, properties);
+    if (!optionsClause.isEmpty()) {
+      sqlBuilder.append("\n").append(optionsClause);
+    }
+
+    String result = sqlBuilder.append(";").toString();
+    LOG.info("Generated CREATE SCHEMA SQL for dataset '{}': {}", databaseName, 
result);
+    return result;
+  }
+
+  /**
+   * Generates the OPTIONS clause for CREATE SCHEMA with all supported 
BigQuery properties.
+   *
+   * @param comment the dataset description
+   * @param properties BigQuery-specific dataset properties
+   * @return the OPTIONS clause, or empty string if no options
+   */
+  private String generateSchemaOptionsClause(String comment, Map<String, 
String> properties) {
+    List<String> options = new ArrayList<>();
+
+    // Description from comment parameter
+    if (StringUtils.isNotEmpty(comment)) {
+      options.add(
+          String.format("description=\"%s\"", 
BigQueryOperationUtils.escapeString(comment)));
+    }
+
+    if (properties != null) {
+      // Location (immutable, only at creation)
+      if (properties.containsKey(BigQuerySchemaPropertiesMetadata.LOCATION)) {
+        String location = 
properties.get(BigQuerySchemaPropertiesMetadata.LOCATION);
+        if (StringUtils.isNotEmpty(location)) {
+          options.add(String.format("location=\"%s\"", location));
+        }
+      }
+
+      // Default table expiration days
+      if 
(properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS))
 {
+        String value =
+            
properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(String.format("default_table_expiration_days=%s", 
value));
+        }
+      }
+
+      // Default partition expiration days
+      if (properties.containsKey(
+          BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS)) 
{
+        String value =
+            
properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(String.format("default_partition_expiration_days=%s", 
value));
+        }
+      }
+
+      // Case insensitive setting
+      if 
(properties.containsKey(BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE)) {
+        String value = 
properties.get(BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(String.format("is_case_insensitive=%s", value));
+        }
+      }
+
+      // Storage billing model
+      if 
(properties.containsKey(BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL))
 {
+        String value = 
properties.get(BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(String.format("storage_billing_model=\"%s\"", value));
+        }
+      }
+
+      // Max time travel hours
+      if 
(properties.containsKey(BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS))
 {
+        String value = 
properties.get(BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(String.format("max_time_travel_hours=%s", value));
+        }
+      }
+
+      // Default KMS key name
+      if 
(properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME)) 
{
+        String value = 
properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(String.format("default_kms_key_name=\"%s\"", value));
+        }
+      }
+
+      // Default rounding mode
+      if 
(properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE))
 {
+        String value = 
properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(String.format("default_rounding_mode=\"%s\"", value));
+        }
+      }
+
+      // Friendly name
+      if 
(properties.containsKey(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME)) {
+        String value = 
properties.get(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(
+              String.format("friendly_name=\"%s\"", 
BigQueryOperationUtils.escapeString(value)));
+        }
+      }
+
+      // Failover reservation
+      if 
(properties.containsKey(BigQuerySchemaPropertiesMetadata.FAILOVER_RESERVATION)) 
{
+        String value = 
properties.get(BigQuerySchemaPropertiesMetadata.FAILOVER_RESERVATION);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(String.format("failover_reservation=\"%s\"", value));
+        }
+      }
+
+      // Is primary replica
+      if (properties.containsKey(BigQuerySchemaPropertiesMetadata.IS_PRIMARY)) 
{
+        String value = 
properties.get(BigQuerySchemaPropertiesMetadata.IS_PRIMARY);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(String.format("is_primary=%s", value));
+        }
+      }
+
+      // Primary replica name
+      if 
(properties.containsKey(BigQuerySchemaPropertiesMetadata.PRIMARY_REPLICA)) {
+        String value = 
properties.get(BigQuerySchemaPropertiesMetadata.PRIMARY_REPLICA);
+        if (StringUtils.isNotEmpty(value)) {
+          options.add(String.format("primary_replica=\"%s\"", value));
+        }
+      }
+
+      // Labels (expect JSON array format)
+      if (properties.containsKey(BigQuerySchemaPropertiesMetadata.LABELS)) {
+        String labels = 
properties.get(BigQuerySchemaPropertiesMetadata.LABELS);
+        if (StringUtils.isNotEmpty(labels)) {
+          String labelsClause = convertLabelsToSqlFormat(labels);
+          if (StringUtils.isNotEmpty(labelsClause)) {
+            options.add(String.format("labels=%s", labelsClause));
+          }
+        }
+      }
+
+      // IAM Tags (expect JSON array format)
+      if (properties.containsKey(BigQuerySchemaPropertiesMetadata.TAGS)) {
+        String tags = properties.get(BigQuerySchemaPropertiesMetadata.TAGS);
+        if (StringUtils.isNotEmpty(tags)) {
+          String tagsClause = convertTagsToSqlFormat(tags);
+          if (StringUtils.isNotEmpty(tagsClause)) {
+            options.add(String.format("tags=%s", tagsClause));
+          }
+        }
+      }
+
+      // Description from properties (overrides comment if both present)
+      if 
(properties.containsKey(BigQuerySchemaPropertiesMetadata.DESCRIPTION)) {
+        String description = 
properties.get(BigQuerySchemaPropertiesMetadata.DESCRIPTION);
+        if (StringUtils.isNotEmpty(description)) {
+          // Remove description from comment if it was already added
+          options.removeIf(option -> option.startsWith("description="));
+          options.add(
+              String.format(
+                  "description=\"%s\"", 
BigQueryOperationUtils.escapeString(description)));
+        }
+      }
+    }
+
+    if (options.isEmpty()) {
+      return "";
+    }
+
+    return "OPTIONS(\n  " + String.join(",\n  ", options) + "\n)";
+  }
+
+  /**
+   * Converts JSON array format labels to BigQuery SQL format.
+   *
+   * <p>Input: [{"key1":"value1"},{"key2":"value2"}] Output: 
[("key1","value1"),("key2","value2")]
+   *
+   * <p>Uses Jackson for robust JSON parsing with proper error handling.
+   *
+   * @param labelsJson JSON array string
+   * @return BigQuery SQL format labels string
+   */
+  private String convertLabelsToSqlFormat(String labelsJson) {
+    if (StringUtils.isBlank(labelsJson)) {
+      return "";
+    }
+
+    try {
+      // Parse JSON array of label objects using Jackson
+      List<Map<String, String>> labelsList =
+          OBJECT_MAPPER.readValue(labelsJson, new TypeReference<>() {});
+
+      if (labelsList == null || labelsList.isEmpty()) {
+        return "";
+      }
+
+      List<String> labelPairs = new ArrayList<>();
+      for (Map<String, String> labelMap : labelsList) {
+        // Each map should have exactly one key-value pair
+        for (Map.Entry<String, String> entry : labelMap.entrySet()) {
+          String key = entry.getKey();
+          String value = entry.getValue();
+          if (StringUtils.isNotBlank(key) && value != null) {
+            labelPairs.add(String.format("(\"%s\",\"%s\")", key, value));
+          }
+        }
+      }
+
+      if (labelPairs.isEmpty()) {
+        return "";
+      }
+
+      return "[" + String.join(",", labelPairs) + "]";
+
+    } catch (Exception e) {
+      LOG.warn(
+          "Failed to parse labels JSON using Jackson: {}. Error: {}. Using 
as-is.",
+          labelsJson,
+          e.getMessage());
+      return labelsJson; // Return as-is if parsing fails
+    }
+  }
+
+  /**
+   * Converts JSON array format tags to BigQuery SQL format.
+   *
+   * <p>Similar to labels conversion but for IAM tags. Tags use the same 
format as labels in
+   * BigQuery SQL.
+   *
+   * @param tagsJson JSON array string
+   * @return BigQuery SQL format tags string
+   */
+  private String convertTagsToSqlFormat(String tagsJson) {
+    // Tags use the same format as labels in BigQuery SQL
+    return convertLabelsToSqlFormat(tagsJson);
+  }
+
+  @Override
+  public void create(String databaseName, String comment, Map<String, String> 
properties)
+      throws SchemaAlreadyExistsException {
+    // Validate properties before creation
+    validateProperties(properties, false);
+
+    LOG.info("Beginning to create database {}", databaseName);
+    String originComment = StringIdentifier.removeIdFromComment(comment);
+    // Check if schema comment is supported by the database
+    // For BigQuery, supportSchemaComment() returns true, so this check will 
pass
+    // This is a standard pattern from JdbcDatabaseOperations to ensure 
compatibility
+    if (!supportSchemaComment() && StringUtils.isNotEmpty(originComment)) {
+      throw new UnsupportedOperationException(
+          "Doesn't support setting schema comment: " + originComment);
+    }
+
+    try (final Connection connection = getConnection()) {
+      JdbcConnectorUtils.executeUpdate(
+          connection, generateCreateDatabaseSql(databaseName, comment, 
properties));
+      LOG.info("Finished creating database {}", databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  /**
+   * Alters a dataset's properties using BigQuery ALTER SCHEMA statement.
+   *
+   * <p>Only mutable properties can be altered. Immutable properties 
(location, default_collation)
+   * will be ignored with a warning.
+   *
+   * @param databaseName the dataset name to alter
+   * @param properties the properties to update (only mutable properties will 
be applied)
+   */
+  @SuppressWarnings("unused")
+  public void alter(String databaseName, Map<String, String> properties) {
+    // Validate properties for ALTER operation
+    validateProperties(properties, true);
+
+    String alterSql = generateAlterDatabaseSql(databaseName, properties);
+    if (StringUtils.isEmpty(alterSql)) {
+      LOG.info("No mutable properties to alter for dataset '{}'", 
databaseName);
+      return;
+    }
+
+    LOG.info("Beginning to alter database {}", databaseName);
+    try (final Connection connection = getConnection()) {
+      JdbcConnectorUtils.executeUpdate(connection, alterSql);
+      LOG.info("Finished altering database {}", databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  /**
+   * Generates ALTER SCHEMA SET OPTIONS SQL for BigQuery.
+   *
+   * <p>BigQuery syntax: ALTER SCHEMA [IF EXISTS] dataset_name SET 
OPTIONS(schema_option_list)
+   *
+   * <p>Note: Only mutable properties can be altered. Immutable properties 
(location,
+   * default_collation) cannot be changed after dataset creation.
+   *
+   * @param databaseName the dataset name
+   * @param properties the properties to update (only mutable properties)
+   * @return the ALTER SCHEMA SQL statement, or empty string if no mutable 
properties
+   */
+  protected String generateAlterDatabaseSql(String databaseName, Map<String, 
String> properties) {
+
+    if (properties == null || properties.isEmpty()) {
+      return "";
+    }
+
+    List<String> options = new ArrayList<>();
+
+    // Define mutable properties with their SQL format
+    // Format: property key -> SQL option format (with %s placeholder for 
value)
+    Map<String, PropertyFormatter> mutableProperties = new HashMap<>();
+
+    // Numeric properties (no quotes)
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS,
+        new PropertyFormatter("default_table_expiration_days=%s", false));
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS,
+        new PropertyFormatter("default_partition_expiration_days=%s", false));
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE,
+        new PropertyFormatter("is_case_insensitive=%s", false));
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS,
+        new PropertyFormatter("max_time_travel_hours=%s", false));
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.IS_PRIMARY, new 
PropertyFormatter("is_primary=%s", false));
+
+    // String properties (with quotes)
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL,
+        new PropertyFormatter("storage_billing_model=\"%s\"", false));
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME,
+        new PropertyFormatter("default_kms_key_name=\"%s\"", false));
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE,
+        new PropertyFormatter("default_rounding_mode=\"%s\"", false));
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.FAILOVER_RESERVATION,
+        new PropertyFormatter("failover_reservation=\"%s\"", false));
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.PRIMARY_REPLICA,
+        new PropertyFormatter("primary_replica=\"%s\"", false));
+
+    // String properties that need escaping
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME,
+        new PropertyFormatter("friendly_name=\"%s\"", true));
+    mutableProperties.put(
+        BigQuerySchemaPropertiesMetadata.DESCRIPTION,
+        new PropertyFormatter("description=\"%s\"", true));
+
+    // Process standard properties
+    for (Map.Entry<String, PropertyFormatter> entry : 
mutableProperties.entrySet()) {
+      String propertyKey = entry.getKey();
+      PropertyFormatter formatter = entry.getValue();
+
+      if (properties.containsKey(propertyKey)) {
+        String value = properties.get(propertyKey);
+        if (StringUtils.isNotEmpty(value)) {
+          String formattedValue =
+              formatter.needsEscaping ? 
BigQueryOperationUtils.escapeString(value) : value;
+          options.add(String.format(formatter.format, formattedValue));
+        }
+      }
+    }
+
+    // Labels (mutable)
+    if (properties.containsKey(BigQuerySchemaPropertiesMetadata.LABELS)) {
+      String labels = properties.get(BigQuerySchemaPropertiesMetadata.LABELS);
+      if (StringUtils.isNotEmpty(labels)) {
+        String labelsClause = convertLabelsToSqlFormat(labels);
+        if (StringUtils.isNotEmpty(labelsClause)) {
+          options.add(String.format("labels=%s", labelsClause));
+        }
+      }
+    }
+
+    // IAM Tags (mutable)
+    if (properties.containsKey(BigQuerySchemaPropertiesMetadata.TAGS)) {
+      String tags = properties.get(BigQuerySchemaPropertiesMetadata.TAGS);
+      if (StringUtils.isNotEmpty(tags)) {
+        String tagsClause = convertTagsToSqlFormat(tags);
+        if (StringUtils.isNotEmpty(tagsClause)) {
+          options.add(String.format("tags=%s", tagsClause));
+        }
+      }
+    }
+
+    if (options.isEmpty()) {
+      LOG.debug("No mutable properties to alter for dataset '{}'", 
databaseName);
+      return "";
+    }
+
+    String result =
+        String.format(
+            "ALTER SCHEMA `%s`\nSET OPTIONS(\n  %s\n);",
+            databaseName, String.join(",\n  ", options));
+
+    LOG.info("Generated ALTER SCHEMA SQL for dataset '{}': {}", databaseName, 
result);
+    return result;
+  }
+
+  /**
+   * Validates dataset properties and logs warnings for unsupported properties.
+   *
+   * @param properties the properties to validate
+   * @param isAlter true if this is for ALTER operation, false for CREATE
+   */
+  private void validateProperties(Map<String, String> properties, boolean 
isAlter) {
+    if (properties == null || properties.isEmpty()) {
+      return;
+    }
+
+    for (String propertyName : properties.keySet()) {
+      // Skip Gravitino internal properties
+      if (propertyName.equals(StringIdentifier.ID_KEY)) {
+        continue;
+      }
+
+      // Check if property is supported
+      if (!BigQuerySchemaPropertiesMetadata.isSupportedProperty(propertyName)) 
{
+        LOG.warn(
+            "Unknown property '{}' for BigQuery dataset. This property will be 
ignored. "
+                + "Supported properties are: location, 
default_table_expiration_days, "
+                + "default_partition_expiration_days, storage_billing_model, 
max_time_travel_hours, "
+                + "default_kms_key_name, default_rounding_mode, 
is_case_insensitive, description, "
+                + "labels, default_collation, friendly_name, 
failover_reservation, is_primary, "
+                + "primary_replica, tags",
+            propertyName);
+      } else if (isAlter) {
+        // For ALTER operations, check if property is mutable
+        if (!BigQuerySchemaPropertiesMetadata.isMutableProperty(propertyName)) 
{
+          LOG.warn(
+              "Property '{}' is immutable and cannot be changed after dataset 
creation. "
+                  + "This property will be ignored in ALTER operation.",
+              propertyName);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper class to format property values for BigQuery SQL.
+   *
+   * <p>Encapsulates the SQL format string and whether the value needs 
escaping.
+   */
+  private static class PropertyFormatter {
+    final String format;
+    final boolean needsEscaping;
+
+    PropertyFormatter(String format, boolean needsEscaping) {
+      this.format = format;
+      this.needsEscaping = needsEscaping;
+    }
+  }
 }
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryOperationUtils.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryOperationUtils.java
new file mode 100644
index 0000000000..9d828dcbf4
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryOperationUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery.operation;
+
+import com.google.api.services.bigquery.Bigquery;
+import org.apache.gravitino.catalog.bigquery.BigQueryClientPool;
+
+/**
+ * Utility class for BigQuery operations.
+ *
+ * <p>Provides common helper methods for BigQuery API operations to reduce 
code duplication.
+ */
+public class BigQueryOperationUtils {
+
+  /**
+   * Gets BigQuery API client and project ID from the client pool.
+   *
+   * <p>This method validates that the client pool is available and returns 
the necessary components
+   * for BigQuery API operations.
+   *
+   * @param clientPool the BigQuery client pool
+   * @param operationName the name of the operation (for error messages)
+   * @return BigQueryContext containing the client and project ID
+   * @throws RuntimeException if client pool is not available
+   */
+  public static BigQueryContext getBigQueryContext(
+      BigQueryClientPool clientPool, String operationName) {
+    if (clientPool == null) {
+      throw new RuntimeException(
+          String.format(
+              "BigQuery API client pool is not available. Cannot perform %s 
operation.",
+              operationName));
+    }
+
+    Bigquery bigquery = clientPool.getClient();
+    String projectId = clientPool.getProjectId();
+
+    return new BigQueryContext(bigquery, projectId);
+  }
+
+  /**
+   * Context object containing BigQuery API client and project ID.
+   *
+   * <p>This class encapsulates the BigQuery client and project ID to simplify 
method signatures.
+   */
+  public static class BigQueryContext {
+    private final Bigquery bigquery;
+    private final String projectId;
+
+    public BigQueryContext(Bigquery bigquery, String projectId) {
+      this.bigquery = bigquery;
+      this.projectId = projectId;
+    }
+
+    public Bigquery getBigquery() {
+      return bigquery;
+    }
+
+    public String getProjectId() {
+      return projectId;
+    }
+  }
+
+  /**
+   * Escapes special characters in strings for BigQuery SQL.
+   *
+   * @param str the string to escape
+   * @return the escaped string
+   */
+  public static String escapeString(String str) {
+    if (str == null) {
+      return "";
+    }
+    return str.replace("\\", "\\\\")
+        .replace("\"", "\\\"")
+        .replace("\n", "\\n")
+        .replace("\r", "\\r")
+        .replace("\t", "\\t");
+  }
+
+  private BigQueryOperationUtils() {
+    // Utility class, prevent instantiation
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java
index f8119c57b2..ab7b927087 100644
--- 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java
@@ -23,6 +23,7 @@ import java.sql.ResultSet;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.NotImplementedException;
+import org.apache.gravitino.catalog.bigquery.BigQueryClientPool;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
 import org.apache.gravitino.catalog.jdbc.JdbcTable;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
@@ -34,6 +35,26 @@ import org.apache.gravitino.rel.indexes.Index;
 /** Table operations for BigQuery. */
 public class BigQueryTableOperations extends JdbcTableOperations {
 
+  private BigQueryClientPool clientPool;
+
+  /**
+   * Sets the BigQuery API client pool.
+   *
+   * @param clientPool BigQuery client pool
+   */
+  public void setClientPool(BigQueryClientPool clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  /**
+   * Gets the BigQuery API client pool.
+   *
+   * @return BigQuery client pool, may be null if not set
+   */
+  protected BigQueryClientPool getClientPool() {
+    return clientPool;
+  }
+
   @Override
   protected String generateCreateTableSql(
       String tableName,
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/resources/jdbc-bigquery.conf 
b/catalogs/catalog-jdbc-bigquery/src/main/resources/jdbc-bigquery.conf
index cf1befc96f..0557c12d62 100644
--- a/catalogs/catalog-jdbc-bigquery/src/main/resources/jdbc-bigquery.conf
+++ b/catalogs/catalog-jdbc-bigquery/src/main/resources/jdbc-bigquery.conf
@@ -19,11 +19,11 @@
 
 # BigQuery JDBC connection configuration
 # Example configuration with Service Account authentication:
-# 1. jdbc-driver = com.simba.googlebigquery.jdbc42.Driver
-# 2. jdbc-url = jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443
+# jdbc-driver = com.simba.googlebigquery.jdbc42.Driver
+# jdbc-url = jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443
 # Google Cloud Project
-# 3. project-id = <your-project-id>
+# project-id = <your-project-id>
 # Note: jdbc-user and jdbc-password are not typically used with BigQuery
-# 4. jdbc-user = 
<your-service-account>@<your-project-id>.iam.gserviceaccount.com
-#/path/to/key.json
-# 5. jdbc-password = <your-key-json-path>
\ No newline at end of file
+# jdbc-user = <your-service-account>@<your-project-id>.iam.gserviceaccount.com
+# Add /path/to/key.json
+# jdbc-password = <your-key-json-path>
\ No newline at end of file
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryCatalog.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryCatalog.java
new file mode 100644
index 0000000000..819a07fd94
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryCatalog.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for BigQueryCatalog. */
+public class TestBigQueryCatalog {
+
+  @Test
+  void testShortName() {
+    BigQueryCatalog catalog = new BigQueryCatalog();
+    assertEquals("jdbc-bigquery", catalog.shortName());
+  }
+
+  @Test
+  void testNewCapability() {
+    BigQueryCatalog catalog = new BigQueryCatalog();
+    assertNotNull(catalog.newCapability());
+    assertTrue(catalog.newCapability() instanceof BigQueryCatalogCapability);
+  }
+
+  @Test
+  void testPropertiesMetadata() {
+    BigQueryCatalog catalog = new BigQueryCatalog();
+
+    assertNotNull(catalog.catalogPropertiesMetadata());
+    assertTrue(catalog.catalogPropertiesMetadata() instanceof 
BigQueryCatalogPropertiesMetadata);
+
+    assertNotNull(catalog.schemaPropertiesMetadata());
+    assertTrue(catalog.schemaPropertiesMetadata() instanceof 
BigQuerySchemaPropertiesMetadata);
+
+    assertNotNull(catalog.tablePropertiesMetadata());
+    assertTrue(catalog.tablePropertiesMetadata() instanceof 
BigQueryTablePropertiesMetadata);
+  }
+
+  @Test
+  void testWithCatalogConf() {
+    Map<String, String> config = new HashMap<>();
+    config.put("project-id", "test-project");
+    config.put("jdbc-password", "/path/to/key.json");
+    config.put("jdbc-url", 
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;";);
+
+    BigQueryCatalog catalog = new BigQueryCatalog();
+    BigQueryCatalog configuredCatalog = catalog.withCatalogConf(config);
+
+    assertNotNull(configuredCatalog);
+    assertNotNull(configuredCatalog.getClientPool());
+    assertEquals("test-project", 
configuredCatalog.getClientPool().getProjectId());
+  }
+
+  @Test
+  void testBuildJdbcUrlWithExistingAuth() {
+    Map<String, String> config = new HashMap<>();
+    config.put("project-id", "test-project");
+    config.put("jdbc-password", "/path/to/key.json");
+    config.put(
+        "jdbc-url",
+        
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=test-project;OAuthType=0;";);
+
+    BigQueryCatalog catalog = new BigQueryCatalog();
+    BigQueryCatalog configuredCatalog = catalog.withCatalogConf(config);
+
+    assertNotNull(configuredCatalog);
+    assertNotNull(configuredCatalog.getClientPool());
+  }
+
+  @Test
+  void testBuildJdbcUrlWithoutAuth() {
+    Map<String, String> config = new HashMap<>();
+    config.put("project-id", "test-project");
+    config.put("jdbc-password", "/path/to/key.json");
+    config.put("jdbc-user", "[email protected]");
+
+    BigQueryCatalog catalog = new BigQueryCatalog();
+    BigQueryCatalog configuredCatalog = catalog.withCatalogConf(config);
+
+    assertNotNull(configuredCatalog);
+    assertNotNull(configuredCatalog.getClientPool());
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryClientPool.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryClientPool.java
new file mode 100644
index 0000000000..678c37f138
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryClientPool.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for BigQueryClientPool. */
+public class TestBigQueryClientPool {
+
+  @Test
+  void testConstructorWithValidConfig() {
+    Map<String, String> config = new HashMap<>();
+    config.put("project-id", "test-project");
+    config.put("jdbc-password", "/path/to/key.json");
+
+    BigQueryClientPool pool = new BigQueryClientPool(config);
+    assertNotNull(pool);
+    assertEquals("test-project", pool.getProjectId());
+  }
+
+  @Test
+  void testConstructorWithMissingProjectId() {
+    Map<String, String> config = new HashMap<>();
+    config.put("jdbc-password", "/path/to/key.json");
+
+    assertThrows(IllegalArgumentException.class, () -> new 
BigQueryClientPool(config));
+  }
+
+  @Test
+  void testConstructorWithMissingKeyFile() {
+    Map<String, String> config = new HashMap<>();
+    config.put("project-id", "test-project");
+
+    assertThrows(IllegalArgumentException.class, () -> new 
BigQueryClientPool(config));
+  }
+
+  @Test
+  void testConstructorWithEmptyProjectId() {
+    Map<String, String> config = new HashMap<>();
+    config.put("project-id", "");
+    config.put("jdbc-password", "/path/to/key.json");
+
+    assertThrows(IllegalArgumentException.class, () -> new 
BigQueryClientPool(config));
+  }
+
+  @Test
+  void testConstructorWithEmptyKeyFile() {
+    Map<String, String> config = new HashMap<>();
+    config.put("project-id", "test-project");
+    config.put("jdbc-password", "");
+
+    assertThrows(IllegalArgumentException.class, () -> new 
BigQueryClientPool(config));
+  }
+
+  @Test
+  void testClose() {
+    Map<String, String> config = new HashMap<>();
+    config.put("project-id", "test-project");
+    config.put("jdbc-password", "/path/to/key.json");
+
+    BigQueryClientPool pool = new BigQueryClientPool(config);
+    // Should not throw exception
+    pool.close();
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQuerySchemaPropertiesMetadata.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQuerySchemaPropertiesMetadata.java
new file mode 100644
index 0000000000..e13e85986d
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQuerySchemaPropertiesMetadata.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for BigQuerySchemaPropertiesMetadata. */
+public class TestBigQuerySchemaPropertiesMetadata {
+
+  private BigQuerySchemaPropertiesMetadata metadata;
+
+  @BeforeEach
+  void setUp() {
+    metadata = new BigQuerySchemaPropertiesMetadata();
+  }
+
+  @Test
+  void testPropertyEntries() {
+    Map<String, PropertyEntry<?>> properties = metadata.propertyEntries();
+    assertNotNull(properties);
+
+    // Check that all expected properties are present
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.LOCATION));
+    assertTrue(
+        
properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS));
+    assertTrue(
+        
properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.DESCRIPTION));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.LABELS));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_COLLATION));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.FAILOVER_RESERVATION));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.IS_PRIMARY));
+    
assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.PRIMARY_REPLICA));
+    assertTrue(properties.containsKey(BigQuerySchemaPropertiesMetadata.TAGS));
+  }
+
+  @Test
+  void testLocationProperty() {
+    Map<String, PropertyEntry<?>> properties = metadata.propertyEntries();
+    PropertyEntry<?> locationProperty = 
properties.get(BigQuerySchemaPropertiesMetadata.LOCATION);
+
+    assertNotNull(locationProperty);
+    assertTrue(locationProperty.isImmutable());
+    assertFalse(locationProperty.isRequired());
+  }
+
+  @Test
+  void testDefaultCollationProperty() {
+    Map<String, PropertyEntry<?>> properties = metadata.propertyEntries();
+    PropertyEntry<?> collationProperty =
+        properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_COLLATION);
+
+    assertNotNull(collationProperty);
+    assertTrue(collationProperty.isImmutable());
+    assertFalse(collationProperty.isRequired());
+  }
+
+  @Test
+  void testMutableProperties() {
+    Map<String, PropertyEntry<?>> properties = metadata.propertyEntries();
+
+    // These properties should be mutable
+    PropertyEntry<?> expirationProperty =
+        
properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS);
+    assertNotNull(expirationProperty);
+    assertFalse(expirationProperty.isImmutable());
+
+    PropertyEntry<?> partitionExpirationProperty =
+        
properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS);
+    assertNotNull(partitionExpirationProperty);
+    assertFalse(partitionExpirationProperty.isImmutable());
+
+    PropertyEntry<?> caseInsensitiveProperty =
+        properties.get(BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE);
+    assertNotNull(caseInsensitiveProperty);
+    assertFalse(caseInsensitiveProperty.isImmutable());
+
+    PropertyEntry<?> descriptionProperty =
+        properties.get(BigQuerySchemaPropertiesMetadata.DESCRIPTION);
+    assertNotNull(descriptionProperty);
+    assertFalse(descriptionProperty.isImmutable());
+
+    PropertyEntry<?> labelsProperty = 
properties.get(BigQuerySchemaPropertiesMetadata.LABELS);
+    assertNotNull(labelsProperty);
+    assertFalse(labelsProperty.isImmutable());
+
+    PropertyEntry<?> storageBillingProperty =
+        properties.get(BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL);
+    assertNotNull(storageBillingProperty);
+    assertFalse(storageBillingProperty.isImmutable());
+
+    PropertyEntry<?> timeTravelProperty =
+        properties.get(BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS);
+    assertNotNull(timeTravelProperty);
+    assertFalse(timeTravelProperty.isImmutable());
+
+    PropertyEntry<?> kmsKeyProperty =
+        properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME);
+    assertNotNull(kmsKeyProperty);
+    assertFalse(kmsKeyProperty.isImmutable());
+
+    PropertyEntry<?> roundingModeProperty =
+        properties.get(BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE);
+    assertNotNull(roundingModeProperty);
+    assertFalse(roundingModeProperty.isImmutable());
+
+    PropertyEntry<?> friendlyNameProperty =
+        properties.get(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME);
+    assertNotNull(friendlyNameProperty);
+    assertFalse(friendlyNameProperty.isImmutable());
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQueryDatabaseOperations.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQueryDatabaseOperations.java
new file mode 100644
index 0000000000..9b5e1963ed
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQueryDatabaseOperations.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery.operation;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.catalog.bigquery.BigQuerySchemaPropertiesMetadata;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for BigQueryDatabaseOperations. */
+public class TestBigQueryDatabaseOperations {
+
+  private BigQueryDatabaseOperations operations;
+
+  @BeforeEach
+  void setUp() {
+    operations = new BigQueryDatabaseOperations();
+  }
+
+  @Test
+  void testGenerateCreateDatabaseSql() {
+    // Test basic CREATE SCHEMA
+    String sql = operations.generateCreateDatabaseSql("test_dataset", null, 
null);
+    assertEquals("CREATE SCHEMA `test_dataset`;", sql);
+
+    // Test with comment only
+    sql = operations.generateCreateDatabaseSql("test_dataset", "Test 
description", null);
+    assertTrue(sql.contains("CREATE SCHEMA `test_dataset`"));
+    assertTrue(sql.contains("OPTIONS"));
+    assertTrue(sql.contains("description=\"Test description\""));
+
+    // Test with comprehensive properties
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BigQuerySchemaPropertiesMetadata.LOCATION, "us-central1");
+    
properties.put(BigQuerySchemaPropertiesMetadata.DEFAULT_TABLE_EXPIRATION_DAYS, 
"30");
+    
properties.put(BigQuerySchemaPropertiesMetadata.DEFAULT_PARTITION_EXPIRATION_DAYS,
 "365");
+    properties.put(BigQuerySchemaPropertiesMetadata.STORAGE_BILLING_MODEL, 
"LOGICAL");
+    properties.put(BigQuerySchemaPropertiesMetadata.MAX_TIME_TRAVEL_HOURS, 
"168");
+    properties.put(BigQuerySchemaPropertiesMetadata.DEFAULT_ROUNDING_MODE, 
"ROUND_HALF_EVEN");
+    properties.put(BigQuerySchemaPropertiesMetadata.IS_CASE_INSENSITIVE, 
"true");
+    properties.put(BigQuerySchemaPropertiesMetadata.FRIENDLY_NAME, "Test 
Dataset");
+
+    sql = operations.generateCreateDatabaseSql("test_dataset", "Test 
description", properties);
+    assertTrue(sql.contains("CREATE SCHEMA `test_dataset`"));
+    assertTrue(sql.contains("OPTIONS"));
+    assertTrue(sql.contains("location=\"us-central1\""));
+    assertTrue(sql.contains("default_table_expiration_days=30"));
+    assertTrue(sql.contains("default_partition_expiration_days=365"));
+    assertTrue(sql.contains("storage_billing_model=\"LOGICAL\""));
+    assertTrue(sql.contains("max_time_travel_hours=168"));
+    assertTrue(sql.contains("default_rounding_mode=\"ROUND_HALF_EVEN\""));
+    assertTrue(sql.contains("is_case_insensitive=true"));
+    assertTrue(sql.contains("friendly_name=\"Test Dataset\""));
+    assertTrue(sql.contains("description=\"Test description\""));
+
+    // Test with default collation
+    properties.put(BigQuerySchemaPropertiesMetadata.DEFAULT_COLLATION, 
"und:ci");
+    sql = operations.generateCreateDatabaseSql("test_dataset", "Test 
description", properties);
+    assertTrue(sql.contains("DEFAULT COLLATE 'und:ci'"));
+  }
+
+  @Test
+  void testGenerateDropDatabaseSql() {
+    // Test DROP SCHEMA without cascade
+    String sql = operations.generateDropDatabaseSql("test_dataset", false);
+    assertEquals("DROP SCHEMA `test_dataset` RESTRICT", sql);
+
+    // Test DROP SCHEMA with cascade
+    sql = operations.generateDropDatabaseSql("test_dataset", true);
+    assertEquals("DROP SCHEMA `test_dataset` CASCADE", sql);
+  }
+
+  @Test
+  void testGenerateCreateDatabaseSqlWithLabels() {
+    // Test with labels in JSON format
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BigQuerySchemaPropertiesMetadata.LOCATION, "us-central1");
+    properties.put(
+        BigQuerySchemaPropertiesMetadata.LABELS,
+        "[{\"environment\":\"test\"},{\"department\":\"data-engineering\"}]");
+
+    String sql =
+        operations.generateCreateDatabaseSql("test_dataset", "Test with 
labels", properties);
+    assertTrue(sql.contains("CREATE SCHEMA `test_dataset`"));
+    assertTrue(sql.contains("location=\"us-central1\""));
+    assertTrue(
+        
sql.contains("labels=[(\"environment\",\"test\"),(\"department\",\"data-engineering\")]"));
+    assertTrue(sql.contains("description=\"Test with labels\""));
+  }
+
+  @Test
+  void testGenerateCreateDatabaseSqlWithKmsKey() {
+    // Test with KMS key
+    Map<String, String> properties = new HashMap<>();
+    properties.put(
+        BigQuerySchemaPropertiesMetadata.DEFAULT_KMS_KEY_NAME,
+        
"projects/my-project/locations/us-central1/keyRings/my-ring/cryptoKeys/my-key");
+
+    String sql = operations.generateCreateDatabaseSql("test_dataset", null, 
properties);
+    assertTrue(sql.contains("CREATE SCHEMA `test_dataset`"));
+    assertTrue(
+        sql.contains(
+            
"default_kms_key_name=\"projects/my-project/locations/us-central1/keyRings/my-ring/cryptoKeys/my-key\""));
+  }
+
+  @Test
+  void testSupportSchemaComment() {
+    assertTrue(operations.supportSchemaComment());
+  }
+
+  @Test
+  void testCreateSysDatabaseNameSet() {
+    
assertTrue(operations.createSysDatabaseNameSet().contains("information_schema"));
+    
assertTrue(operations.createSysDatabaseNameSet().contains("INFORMATION_SCHEMA"));
+  }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 0ac9055b79..7456bb4c2d 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -134,6 +134,8 @@ ognl = "3.4.7"
 concurrent-trees = "2.6.0"
 jakarta-validation = "2.0.2"
 aspectj = "1.9.24"
+simba-bigquery-jdbc = "1.6.3.1004"
+commons-compress = "1.27.1"
 
 [libraries]
 aspectj-aspectjrt = { group = "org.aspectj", name = "aspectjrt", version.ref = 
"aspectj" }
@@ -219,6 +221,7 @@ mockserver-client-java = { group = "org.mock-server", name 
= "mockserver-client-
 commons-csv = { group = "org.apache.commons", name = "commons-csv", 
version.ref = "commons-csv" }
 commons-lang = { group = "commons-lang", name = "commons-lang", version.ref = 
"commons-lang" }
 commons-lang3 = { group = "org.apache.commons", name = "commons-lang3", 
version.ref = "commons-lang3" }
+commons-compress = { group = "org.apache.commons", name = "commons-compress", 
version.ref = "commons-compress" }
 commons-logging = { group = "commons-logging", name = "commons-logging", 
version.ref = "commons-logging" }
 commons-io = { group = "commons-io", name = "commons-io", version.ref = 
"commons-io" }
 caffeine = { group = "com.github.ben-manes.caffeine", name = "caffeine", 
version.ref = "caffeine" }


Reply via email to