This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch internal-main
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 30a35a0bce46e885a8b0c19e1bc60f336739f453
Author: geyanggang <[email protected]>
AuthorDate: Tue Jan 13 16:23:19 2026 +0800

    [#94]feat(bigquery-catalog): Add integration test for BigQuery catalog. 
(#103)
    
    * Add integration test for BQ catalog.
    
    * user document update.
    
    * Add BIgQuery user document.
    
    * Fix bugs.
    
    * Fix bugs.
---
 catalogs/catalog-jdbc-bigquery/build.gradle.kts    |   1 +
 .../integration/CatalogBigQueryCrossRegionIT.java  | 584 ++++++++++++++++
 .../bigquery/integration/CatalogBigQueryIT.java    | 768 +++++++++++++++++++++
 .../integration/CatalogBigQueryPerformanceIT.java  | 758 ++++++++++++++++++++
 .../src/test/resources/log4j2.properties           |  57 ++
 docs/jdbc-bigquery-catalog.md                      | 317 +++++++++
 6 files changed, 2485 insertions(+)

diff --git a/catalogs/catalog-jdbc-bigquery/build.gradle.kts 
b/catalogs/catalog-jdbc-bigquery/build.gradle.kts
index 0311d0f5ff..728cf7fafa 100644
--- a/catalogs/catalog-jdbc-bigquery/build.gradle.kts
+++ b/catalogs/catalog-jdbc-bigquery/build.gradle.kts
@@ -123,6 +123,7 @@ dependencies {
   testImplementation(libs.junit.jupiter.params)
   testImplementation(libs.testcontainers)
   testImplementation(libs.mockito.core)
+  testImplementation(libs.awaitility)
 
   val simbaJdbcDriver = files(
     simbaExtractDir.asFileTree.matching {
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryCrossRegionIT.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryCrossRegionIT.java
new file mode 100644
index 0000000000..c40dd1890d
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryCrossRegionIT.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery.integration;
+
+import static 
org.apache.gravitino.catalog.bigquery.BigQueryCatalogPropertiesMetadata.PROJECT_ID;
+import static org.awaitility.Awaitility.await;
+
+import com.google.common.collect.Maps;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cross-region integration tests for BigQuery catalog.
+ *
+ * <p>This test class focuses on testing BigQuery's cross-region capabilities, 
including:
+ *
+ * <ul>
+ *   <li>Cross-region dataset listing and operations
+ *   <li>Multi-region dataset creation and management
+ *   <li>Cross-region table operations
+ *   <li>Performance testing across regions
+ * </ul>
+ *
+ * <p>Requires the same environment variables as CatalogBigQueryIT plus:
+ *
+ * <ul>
+ *   <li>BIGQUERY_MULTI_REGION_TEST - Set to "true" to enable multi-region 
tests
+ * </ul>
+ */
+@TestInstance(Lifecycle.PER_CLASS)
+@EnabledIf("isCrossRegionTestConfigured")
+public class CatalogBigQueryCrossRegionIT extends BaseIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogBigQueryCrossRegionIT.class);
+  private static final String provider = "jdbc-bigquery";
+
+  // Environment variable names
+  private static final String ENV_PROJECT_ID = "BIGQUERY_PROJECT_ID";
+  private static final String ENV_KEY_PATH = 
"BIGQUERY_SERVICE_ACCOUNT_KEY_PATH";
+  private static final String ENV_SERVICE_ACCOUNT_EMAIL = 
"BIGQUERY_SERVICE_ACCOUNT_EMAIL";
+  private static final String ENV_MULTI_REGION_TEST = 
"BIGQUERY_MULTI_REGION_TEST";
+
+  // Test regions
+  private static final List<String> TEST_REGIONS = Arrays.asList("US", "EU", 
"asia-east1");
+
+  // Test configuration
+  private String projectId;
+  private String keyPath;
+  private String serviceAccountEmail;
+
+  // Test identifiers
+  public String metalakeName = 
GravitinoITUtils.genRandomName("bigquery_cross_region_metalake");
+  public String catalogName = 
GravitinoITUtils.genRandomName("bigquery_cross_region_catalog");
+
+  // Track created resources to clean up
+  private final Set<String> createdSchemas = ConcurrentHashMap.newKeySet();
+  private final Set<NameIdentifier> createdTables = 
ConcurrentHashMap.newKeySet();
+
+  private GravitinoMetalake metalake;
+  protected Catalog catalog;
+
+  /**
+   * Check if BigQuery cross-region testing is configured.
+   *
+   * @return true if all required environment variables are set and 
multi-region test is enabled
+   */
+  static boolean isCrossRegionTestConfigured() {
+    return StringUtils.isNotBlank(System.getenv(ENV_PROJECT_ID))
+        && StringUtils.isNotBlank(System.getenv(ENV_KEY_PATH))
+        && "true".equalsIgnoreCase(System.getenv(ENV_MULTI_REGION_TEST));
+  }
+
+  @BeforeAll
+  public void startup() {
+    // Load configuration from environment variables
+    projectId = System.getenv(ENV_PROJECT_ID);
+    keyPath = System.getenv(ENV_KEY_PATH);
+    serviceAccountEmail = System.getenv(ENV_SERVICE_ACCOUNT_EMAIL);
+    boolean multiRegionTestEnabled = 
"true".equalsIgnoreCase(System.getenv(ENV_MULTI_REGION_TEST));
+
+    LOG.info("Starting BigQuery cross-region integration tests with project: 
{}", projectId);
+    LOG.info("Multi-region test enabled: {}", multiRegionTestEnabled);
+
+    createMetalake();
+    catalog = createCatalog(catalogName);
+  }
+
+  @AfterAll
+  public void stop() {
+    // Clean up only resources created by this test
+    cleanupTestResources();
+
+    // Do NOT delete catalog or metalake - they may be shared
+    LOG.info("Test cleanup completed. Catalog and metalake are preserved.");
+  }
+
+  @AfterEach
+  public void cleanup() {
+    // Clean up resources created in this test method
+    cleanupTestResources();
+  }
+
+  private void cleanupTestResources() {
+    LOG.info(
+        "Cleaning up test resources: {} schemas, {} tables",
+        createdSchemas.size(),
+        createdTables.size());
+
+    // First, clean up all tables created by tests
+    Set<NameIdentifier> tablesToDelete = new HashSet<>(createdTables);
+    for (NameIdentifier tableId : tablesToDelete) {
+      try {
+        LOG.debug("Dropping table: {}", tableId);
+        catalog.asTableCatalog().dropTable(tableId);
+        createdTables.remove(tableId);
+      } catch (Exception e) {
+        LOG.warn("Failed to drop table {}: {}", tableId, e.getMessage());
+      }
+    }
+
+    // Then, clean up all schemas created by tests
+    Set<String> schemasToDelete = new HashSet<>(createdSchemas);
+    for (String schemaName : schemasToDelete) {
+      try {
+        LOG.debug("Dropping schema: {}", schemaName);
+        catalog.asSchemas().dropSchema(schemaName, true);
+        createdSchemas.remove(schemaName);
+      } catch (Exception e) {
+        LOG.warn("Failed to drop schema {}: {}", schemaName, e.getMessage());
+      }
+    }
+
+    // Wait for BigQuery operations to complete
+    await()
+        .atMost(Duration.ofSeconds(10))
+        .pollInterval(Duration.ofMillis(500))
+        .until(() -> true); // BigQuery operations are eventually consistent
+  }
+
+  private void createMetalake() {
+    try {
+      // Check if metalake already exists
+      GravitinoMetalake[] existingMetalakes = client.listMetalakes();
+      for (GravitinoMetalake existing : existingMetalakes) {
+        if (existing.name().equals(metalakeName)) {
+          LOG.info("Using existing metalake: {}", metalakeName);
+          metalake = client.loadMetalake(metalakeName);
+          return;
+        }
+      }
+
+      // Create new metalake if it doesn't exist
+      client.createMetalake(metalakeName, "Cross-region test metalake", 
Collections.emptyMap());
+      metalake = client.loadMetalake(metalakeName);
+      Assertions.assertEquals(metalakeName, metalake.name());
+      LOG.info("Created new metalake: {}", metalakeName);
+    } catch (Exception e) {
+      LOG.error("Failed to create or load metalake: {}", e.getMessage());
+      throw new RuntimeException("Failed to setup metalake", e);
+    }
+  }
+
+  private Catalog createCatalog(String catalogName) {
+    try {
+      // Check if catalog already exists in the metalake
+      String[] existingCatalogs = metalake.listCatalogs();
+      for (String existing : existingCatalogs) {
+        if (existing.equals(catalogName)) {
+          LOG.info("Using existing catalog: {}", catalogName);
+          return metalake.loadCatalog(catalogName);
+        }
+      }
+
+      // Create new catalog if it doesn't exist
+      Map<String, String> catalogProperties = Maps.newHashMap();
+
+      // Required properties
+      catalogProperties.put(PROJECT_ID, projectId);
+      catalogProperties.put(JdbcConfig.PASSWORD.getKey(), keyPath); // Key 
file path
+      catalogProperties.put(
+          JdbcConfig.JDBC_DRIVER.getKey(), 
"com.simba.googlebigquery.jdbc42.Driver");
+
+      // Optional properties
+      if (StringUtils.isNotBlank(serviceAccountEmail)) {
+        catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
serviceAccountEmail);
+      }
+
+      Catalog createdCatalog =
+          metalake.createCatalog(
+              catalogName,
+              Catalog.Type.RELATIONAL,
+              provider,
+              "Cross-region test catalog",
+              catalogProperties);
+      Catalog loadCatalog = metalake.loadCatalog(catalogName);
+      Assertions.assertEquals(createdCatalog, loadCatalog);
+
+      LOG.info("Created new catalog: {}", catalogName);
+      return loadCatalog;
+    } catch (Exception e) {
+      LOG.error("Failed to create or load catalog: {}", e.getMessage());
+      throw new RuntimeException("Failed to setup catalog", e);
+    }
+  }
+
+  private Column[] createTestColumns() {
+    return new Column[] {
+      Column.of("id", Types.LongType.get(), "ID column"),
+      Column.of("name", Types.StringType.get(), "Name column"),
+      Column.of("created_at", Types.TimestampType.withoutTimeZone(), "Creation 
timestamp")
+    };
+  }
+
+  @Test
+  void testCrossRegionSchemaListing() {
+    LOG.info("Testing cross-region schema listing");
+
+    // Create schemas in different regions
+    Map<String, String> createdTestSchemas = Maps.newHashMap();
+
+    for (String region : TEST_REGIONS) {
+      String schemaName =
+          GravitinoITUtils.genRandomName("bigquery_cross_region_test_" + 
region.toLowerCase());
+      Map<String, String> properties = Maps.newHashMap();
+      properties.put("location", region);
+      properties.put("description", "Cross-region test schema in " + region);
+
+      try {
+        catalog.asSchemas().createSchema(schemaName, "Test schema in " + 
region, properties);
+        createdTestSchemas.put(region, schemaName);
+        createdSchemas.add(schemaName); // Track for cleanup
+        LOG.info("Created schema {} in region {}", schemaName, region);
+      } catch (Exception e) {
+        LOG.warn("Failed to create schema in region {}: {}", region, 
e.getMessage());
+      }
+    }
+
+    // List all schemas and verify cross-region visibility
+    String[] allSchemas = catalog.asSchemas().listSchemas();
+    Set<String> schemaSet = new HashSet<>(Arrays.asList(allSchemas));
+
+    LOG.info("Total schemas found: {}", allSchemas.length);
+
+    // Verify that schemas from different regions are visible
+    for (Map.Entry<String, String> entry : createdTestSchemas.entrySet()) {
+      String region = entry.getKey();
+      String schemaName = entry.getValue();
+
+      Assertions.assertTrue(
+          schemaSet.contains(schemaName),
+          "Schema " + schemaName + " from region " + region + " should be 
visible");
+
+      // Load schema and verify properties
+      Schema loadedSchema = catalog.asSchemas().loadSchema(schemaName);
+      Assertions.assertEquals(schemaName, loadedSchema.name());
+      Assertions.assertEquals(region, 
loadedSchema.properties().get("location"));
+    }
+
+    LOG.info("Cross-region schema listing test completed successfully");
+  }
+
+  @Test
+  void testCrossRegionTableOperations() {
+    LOG.info("Testing cross-region table operations");
+
+    // Create a schema in US region
+    String usSchemaName = 
GravitinoITUtils.genRandomName("bigquery_cross_region_us_test");
+    Map<String, String> usProperties = Maps.newHashMap();
+    usProperties.put("location", "US");
+    catalog.asSchemas().createSchema(usSchemaName, "US test schema", 
usProperties);
+    createdSchemas.add(usSchemaName); // Track for cleanup
+
+    // Create a schema in EU region (if supported)
+    String euSchemaName = 
GravitinoITUtils.genRandomName("bigquery_cross_region_eu_test");
+    Map<String, String> euProperties = Maps.newHashMap();
+    euProperties.put("location", "EU");
+
+    boolean euSchemaCreated = false;
+    try {
+      catalog.asSchemas().createSchema(euSchemaName, "EU test schema", 
euProperties);
+      createdSchemas.add(euSchemaName); // Track for cleanup
+      euSchemaCreated = true;
+      LOG.info("Successfully created EU schema: {}", euSchemaName);
+    } catch (Exception e) {
+      LOG.warn("Failed to create EU schema, continuing with US only: {}", 
e.getMessage());
+    }
+
+    // Create tables in US schema
+    String usTableName = GravitinoITUtils.genRandomName("us_table");
+    NameIdentifier usTableId = NameIdentifier.of(usSchemaName, usTableName);
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        usTableId,
+        createTestColumns(),
+        "US table",
+        Collections.emptyMap(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0]);
+    createdTables.add(usTableId); // Track for cleanup
+
+    // Create table in EU schema if available
+    if (euSchemaCreated) {
+      String euTableName = GravitinoITUtils.genRandomName("eu_table");
+      NameIdentifier euTableId = NameIdentifier.of(euSchemaName, euTableName);
+
+      tableCatalog.createTable(
+          euTableId,
+          createTestColumns(),
+          "EU table",
+          Collections.emptyMap(),
+          Transforms.EMPTY_TRANSFORM,
+          Distributions.NONE,
+          new SortOrder[0]);
+      createdTables.add(euTableId); // Track for cleanup
+
+      // Verify both tables are accessible
+      Table usTable = tableCatalog.loadTable(usTableId);
+      Table euTable = tableCatalog.loadTable(euTableId);
+
+      Assertions.assertEquals(usTableName, usTable.name());
+      Assertions.assertEquals(euTableName, euTable.name());
+
+      LOG.info("Successfully created and accessed tables in both US and EU 
regions");
+    }
+
+    // List tables across regions
+    NameIdentifier[] usTables = 
tableCatalog.listTables(Namespace.of(usSchemaName));
+    Assertions.assertTrue(usTables.length > 0);
+
+    if (euSchemaCreated) {
+      NameIdentifier[] euTables = 
tableCatalog.listTables(Namespace.of(euSchemaName));
+      Assertions.assertTrue(euTables.length > 0);
+    }
+
+    LOG.info("Cross-region table operations test completed successfully");
+  }
+
+  @Test
+  void testMultiRegionPerformanceComparison() {
+    LOG.info("Testing multi-region performance comparison");
+
+    Map<String, Long> regionPerformance = Maps.newHashMap();
+
+    for (String region : TEST_REGIONS) {
+      try {
+        String schemaName =
+            GravitinoITUtils.genRandomName("bigquery_multi_region_perf_" + 
region.toLowerCase());
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("location", region);
+
+        // Measure schema creation time
+        long startTime = System.currentTimeMillis();
+        catalog.asSchemas().createSchema(schemaName, "Performance test 
schema", properties);
+        long schemaCreationTime = System.currentTimeMillis() - startTime;
+        createdSchemas.add(schemaName); // Track for cleanup
+
+        // Measure table creation time
+        String tableName = GravitinoITUtils.genRandomName("perf_table");
+        NameIdentifier tableId = NameIdentifier.of(schemaName, tableName);
+
+        startTime = System.currentTimeMillis();
+        catalog
+            .asTableCatalog()
+            .createTable(
+                tableId,
+                createTestColumns(),
+                "Performance test table",
+                Collections.emptyMap(),
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                new SortOrder[0]);
+        long tableCreationTime = System.currentTimeMillis() - startTime;
+        createdTables.add(tableId); // Track for cleanup
+
+        // Measure table loading time
+        startTime = System.currentTimeMillis();
+        Table loadedTable = catalog.asTableCatalog().loadTable(tableId);
+        long tableLoadTime = System.currentTimeMillis() - startTime;
+
+        Assertions.assertEquals(tableName, loadedTable.name());
+
+        long totalTime = schemaCreationTime + tableCreationTime + 
tableLoadTime;
+        regionPerformance.put(region, totalTime);
+
+        LOG.info(
+            "Region {} performance: Schema creation: {}ms, Table creation: 
{}ms, Table load: {}ms, Total: {}ms",
+            region,
+            schemaCreationTime,
+            tableCreationTime,
+            tableLoadTime,
+            totalTime);
+
+      } catch (Exception e) {
+        LOG.warn("Performance test failed for region {}: {}", region, 
e.getMessage());
+        regionPerformance.put(region, -1L); // Mark as failed
+      }
+    }
+
+    // Report performance comparison
+    LOG.info("Multi-region performance comparison:");
+    regionPerformance.forEach(
+        (region, time) -> {
+          if (time > 0) {
+            LOG.info("  {}: {}ms", region, time);
+          } else {
+            LOG.info("  {}: FAILED", region);
+          }
+        });
+
+    // Verify at least one region worked
+    boolean anyRegionWorked = 
regionPerformance.values().stream().anyMatch(time -> time > 0);
+    Assertions.assertTrue(anyRegionWorked, "At least one region should work");
+
+    LOG.info("Multi-region performance comparison completed");
+  }
+
+  @Test
+  void testCrossRegionDatasetProperties() {
+    LOG.info("Testing cross-region dataset properties");
+
+    Map<String, String> testRegions = Maps.newHashMap();
+    testRegions.put("US", "United States");
+    testRegions.put("EU", "Europe");
+
+    for (Map.Entry<String, String> entry : testRegions.entrySet()) {
+      String region = entry.getKey();
+      String description = entry.getValue();
+
+      try {
+        String schemaName =
+            GravitinoITUtils.genRandomName("bigquery_cross_region_props_" + 
region.toLowerCase());
+
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("location", region);
+        properties.put("description", "Test dataset in " + description);
+        properties.put("friendly_name", "Cross-region test dataset");
+
+        // Create schema with properties
+        Schema createdSchema =
+            catalog
+                .asSchemas()
+                .createSchema(schemaName, "Cross-region properties test", 
properties);
+        createdSchemas.add(schemaName); // Track for cleanup
+
+        Assertions.assertEquals(schemaName, createdSchema.name());
+
+        // Load schema and verify properties
+        Schema loadedSchema = catalog.asSchemas().loadSchema(schemaName);
+        Assertions.assertEquals(schemaName, loadedSchema.name());
+        Assertions.assertEquals(region, 
loadedSchema.properties().get("location"));
+
+        LOG.info("Successfully tested properties for region: {}", region);
+
+      } catch (Exception e) {
+        LOG.warn("Properties test failed for region {}: {}", region, 
e.getMessage());
+      }
+    }
+
+    LOG.info("Cross-region dataset properties test completed");
+  }
+
+  @Test
+  void testCrossRegionConcurrentOperations() throws InterruptedException {
+    LOG.info("Testing cross-region concurrent operations");
+
+    int numThreads = Math.min(3, TEST_REGIONS.size()); // Limit to 3 threads 
max
+    Thread[] threads = new Thread[numThreads];
+
+    for (int i = 0; i < numThreads; i++) {
+      final String region = TEST_REGIONS.get(i);
+      final int threadId = i;
+
+      threads[i] =
+          new Thread(
+              () -> {
+                try {
+                  String schemaName =
+                      GravitinoITUtils.genRandomName(
+                          "bigquery_cross_region_concurrent_" + 
region.toLowerCase());
+                  String tableName = 
GravitinoITUtils.genRandomName("concurrent_table");
+
+                  Map<String, String> properties = Maps.newHashMap();
+                  properties.put("location", region);
+
+                  // Create schema
+                  catalog
+                      .asSchemas()
+                      .createSchema(schemaName, "Concurrent test schema", 
properties);
+                  createdSchemas.add(schemaName); // Track for cleanup
+
+                  // Create table
+                  NameIdentifier tableId = NameIdentifier.of(schemaName, 
tableName);
+                  catalog
+                      .asTableCatalog()
+                      .createTable(
+                          tableId,
+                          createTestColumns(),
+                          "Concurrent test table",
+                          Collections.emptyMap(),
+                          Transforms.EMPTY_TRANSFORM,
+                          Distributions.NONE,
+                          new SortOrder[0]);
+                  createdTables.add(tableId); // Track for cleanup
+
+                  // Load table
+                  Table loadedTable = 
catalog.asTableCatalog().loadTable(tableId);
+                  Assertions.assertEquals(tableName, loadedTable.name());
+
+                  LOG.info("Thread {} completed operations for region {}", 
threadId, region);
+
+                } catch (Exception e) {
+                  LOG.error(
+                      "Concurrent operation failed for region {} in thread {}",
+                      region,
+                      threadId,
+                      e);
+                  // Don't throw exception to avoid failing other threads
+                }
+              });
+    }
+
+    // Start all threads
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    // Wait for all threads to complete
+    for (Thread thread : threads) {
+      thread.join(60000); // 60 second timeout per thread
+    }
+
+    LOG.info("Cross-region concurrent operations test completed");
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryIT.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryIT.java
new file mode 100644
index 0000000000..2cc1b920ac
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryIT.java
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery.integration;
+
+import static 
org.apache.gravitino.catalog.bigquery.BigQueryCatalogPropertiesMetadata.PROJECT_ID;
+import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestInstance(Lifecycle.PER_CLASS)
+@EnabledIf("isBigQueryConfigured")
+public class CatalogBigQueryIT extends BaseIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogBigQueryIT.class);
+  private static final String provider = "jdbc-bigquery";
+
+  private static final String ENV_PROJECT_ID = "BIGQUERY_PROJECT_ID";
+  private static final String ENV_KEY_PATH = 
"BIGQUERY_SERVICE_ACCOUNT_KEY_PATH";
+  private static final String ENV_SERVICE_ACCOUNT_EMAIL = 
"BIGQUERY_SERVICE_ACCOUNT_EMAIL";
+  private static final String ENV_DATASET_LOCATION = 
"BIGQUERY_DATASET_LOCATION";
+
+  private String projectId;
+  private String keyPath;
+  private String serviceAccountEmail;
+  private String datasetLocation;
+
+  public String metalakeName = 
GravitinoITUtils.genRandomName("bigquery_it_metalake");
+  public String catalogName = 
GravitinoITUtils.genRandomName("bigquery_it_catalog");
+  public String schemaName = 
GravitinoITUtils.genRandomName("bigquery_it_schema");
+  public String tableName = 
GravitinoITUtils.genRandomName("bigquery_it_table");
+  public String alertTableName = "alert_table_name";
+  public String table_comment = "table_comment";
+  public String schema_comment = "schema_comment";
+
+  public String BIGQUERY_COL_NAME1 = "bigquery_col_name1";
+  public String BIGQUERY_COL_NAME2 = "bigquery_col_name2";
+  public String BIGQUERY_COL_NAME3 = "bigquery_col_name3";
+  public String BIGQUERY_COL_NAME4 = "bigquery_col_name4";
+  public String BIGQUERY_COL_NAME5 = "bigquery_col_name5";
+
+  private GravitinoMetalake metalake;
+  protected Catalog catalog;
+
+  static boolean isBigQueryConfigured() {
+    return StringUtils.isNotBlank(System.getenv(ENV_PROJECT_ID))
+        && StringUtils.isNotBlank(System.getenv(ENV_KEY_PATH));
+  }
+
+  @BeforeAll
+  public void startup() {
+    projectId = System.getenv(ENV_PROJECT_ID);
+    keyPath = System.getenv(ENV_KEY_PATH);
+    serviceAccountEmail = System.getenv(ENV_SERVICE_ACCOUNT_EMAIL);
+    datasetLocation = System.getenv(ENV_DATASET_LOCATION);
+
+    if (StringUtils.isBlank(datasetLocation)) {
+      datasetLocation = "US";
+    }
+
+    LOG.info("Starting BigQuery integration tests with project: {}", 
projectId);
+    LOG.info("Dataset location: {}", datasetLocation);
+
+    createMetalake();
+    catalog = createCatalog(catalogName);
+    createSchema(catalog, schemaName);
+  }
+
+  @AfterAll
+  public void stop() {
+    clearTableAndSchema();
+    LOG.info(
+        "BigQuery integration tests completed. Catalog and metalake retained 
for other tests.");
+  }
+
+  @AfterEach
+  public void resetSchema() {
+    clearTableAndSchema();
+    createSchema(catalog, schemaName);
+  }
+
+  private void clearTableAndSchema() {
+    try {
+      NameIdentifier[] nameIdentifiers =
+          catalog.asTableCatalog().listTables(Namespace.of(schemaName));
+      for (NameIdentifier nameIdentifier : nameIdentifiers) {
+        catalog.asTableCatalog().dropTable(nameIdentifier);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error clearing tables", e);
+    }
+  }
+
+  private void createMetalake() {
+    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+    GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
+    Assertions.assertEquals(metalakeName, loadMetalake.name());
+    metalake = loadMetalake;
+  }
+
+  private Catalog createCatalog(String catalogName) {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+
+    catalogProperties.put(PROJECT_ID, projectId);
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), keyPath);
+    catalogProperties.put(
+        JdbcConfig.JDBC_DRIVER.getKey(), 
"com.simba.googlebigquery.jdbc42.Driver");
+
+    if (StringUtils.isNotBlank(serviceAccountEmail)) {
+      catalogProperties.put(JdbcConfig.USERNAME.getKey(), serviceAccountEmail);
+    }
+
+    Catalog createdCatalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.RELATIONAL, provider, "comment", 
catalogProperties);
+    Catalog loadCatalog = metalake.loadCatalog(catalogName);
+    Assertions.assertEquals(createdCatalog, loadCatalog);
+
+    return loadCatalog;
+  }
+
+  private void createSchema(Catalog catalog, String schemaName) {
+    Map<String, String> prop = Maps.newHashMap();
+    prop.put("location", datasetLocation);
+
+    try {
+      Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
+      if (loadSchema != null) {
+        LOG.info("Schema {} already exists, skipping creation", schemaName);
+        return;
+      }
+    } catch (NoSuchSchemaException e) {
+      Schema createdSchema = catalog.asSchemas().createSchema(schemaName, 
schema_comment, prop);
+      Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
+      Assertions.assertEquals(createdSchema.name(), loadSchema.name());
+      prop.forEach(
+          (key, value) -> 
Assertions.assertEquals(loadSchema.properties().get(key), value));
+    }
+  }
+
+  private Column[] createColumns() {
+    Column col1 = Column.of(BIGQUERY_COL_NAME1, Types.LongType.get(), 
"col_1_comment");
+    Column col2 = Column.of(BIGQUERY_COL_NAME2, Types.DateType.get(), 
"col_2_comment");
+    Column col3 = Column.of(BIGQUERY_COL_NAME3, Types.StringType.get(), 
"col_3_comment");
+
+    return new Column[] {col1, col2, col3};
+  }
+
+  private Column[] createColumnsWithDefaultValue() {
+    return new Column[] {
+      Column.of(
+          BIGQUERY_COL_NAME1,
+          Types.DoubleType.get(),
+          "col_1_comment",
+          false,
+          false,
+          Literals.of("1.23", Types.DoubleType.get())),
+      Column.of(
+          BIGQUERY_COL_NAME2,
+          Types.TimestampType.withoutTimeZone(),
+          "col_2_comment",
+          false,
+          false,
+          DEFAULT_VALUE_OF_CURRENT_TIMESTAMP),
+      Column.of(
+          BIGQUERY_COL_NAME3, Types.StringType.get(), "col_3_comment", true, 
false, Literals.NULL),
+      Column.of(
+          BIGQUERY_COL_NAME4,
+          Types.LongType.get(),
+          "col_4_comment",
+          false,
+          false,
+          Literals.of("1000", Types.LongType.get())),
+      Column.of(
+          BIGQUERY_COL_NAME5,
+          Types.DecimalType.of(10, 2),
+          "col_5_comment",
+          true,
+          false,
+          Literals.of("1.23", Types.DecimalType.of(10, 2)))
+    };
+  }
+
+  private Map<String, String> createProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("description", "Test table for BigQuery integration");
+    return properties;
+  }
+
+  @Test
+  void testTestConnection() {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    catalogProperties.put(PROJECT_ID, projectId);
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), 
"/invalid/path/to/key.json");
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), "[email protected]");
+    catalogProperties.put(
+        JdbcConfig.JDBC_DRIVER.getKey(), 
"com.simba.googlebigquery.jdbc42.Driver");
+
+    Exception exception =
+        assertThrows(
+            RuntimeException.class,
+            () ->
+                metalake.testConnection(
+                    GravitinoITUtils.genRandomName("bigquery_it_catalog"),
+                    Catalog.Type.RELATIONAL,
+                    provider,
+                    "comment",
+                    catalogProperties));
+    Assertions.assertTrue(
+        exception.getMessage().contains("Error creating datasource")
+            || exception.getMessage().contains("Failed to create BigQuery API 
client")
+            || exception.getMessage().contains("Connection failed"));
+  }
+
+  @Test
+  void testOperationBigQuerySchema() {
+    SupportsSchemas schemas = catalog.asSchemas();
+
+    String[] nameIdentifiers = schemas.listSchemas();
+    Set<String> schemaNames = Sets.newHashSet(nameIdentifiers);
+    Assertions.assertTrue(schemaNames.contains(schemaName.toLowerCase()));
+
+    String testSchemaName = 
GravitinoITUtils.genRandomName("test_schema_1").toLowerCase();
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("location", datasetLocation);
+
+    Schema createdSchema = schemas.createSchema(testSchemaName, 
schema_comment, properties);
+    Assertions.assertNotNull(createdSchema);
+    Assertions.assertEquals(testSchemaName, createdSchema.name());
+
+    nameIdentifiers = schemas.listSchemas();
+    schemaNames = Sets.newHashSet(nameIdentifiers);
+    Assertions.assertTrue(schemaNames.contains(testSchemaName));
+
+    Map<String, String> emptyMap = Collections.emptyMap();
+    Assertions.assertThrows(
+        SchemaAlreadyExistsException.class,
+        () -> schemas.createSchema(testSchemaName, schema_comment, emptyMap));
+
+    schemas.dropSchema(testSchemaName, false);
+    Assertions.assertThrows(NoSuchSchemaException.class, () -> 
schemas.loadSchema(testSchemaName));
+
+    nameIdentifiers = schemas.listSchemas();
+    schemaNames = Sets.newHashSet(nameIdentifiers);
+    Assertions.assertFalse(schemaNames.contains(testSchemaName));
+    Assertions.assertFalse(schemas.dropSchema("no_exits", false));
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    NameIdentifier table = NameIdentifier.of(testSchemaName, "test_table");
+    Assertions.assertThrows(
+        NoSuchSchemaException.class,
+        () ->
+            tableCatalog.createTable(
+                table,
+                createColumns(),
+                table_comment,
+                createProperties(),
+                null,
+                Distributions.NONE,
+                null));
+
+    Assertions.assertFalse(schemas.dropSchema(testSchemaName, true));
+    Assertions.assertFalse(schemas.dropSchema(testSchemaName, false));
+    Assertions.assertFalse(tableCatalog.dropTable(table));
+  }
+
+  @Test
+  void testCreateAndLoadBigQueryTable() {
+    Column[] columns = createColumns();
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    final SortOrder[] sortOrders = new SortOrder[0];
+    Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+    Map<String, String> properties = createProperties();
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Table createdTable =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            table_comment,
+            properties,
+            partitioning,
+            Distributions.NONE,
+            sortOrders);
+
+    Assertions.assertNotNull(createdTable);
+    Assertions.assertEquals(tableName, createdTable.name());
+
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadTable.name());
+
+    Map<String, String> resultProp = loadTable.properties();
+    Assertions.assertNotNull(resultProp);
+
+    Assertions.assertEquals(loadTable.columns().length, columns.length);
+    for (int i = 0; i < columns.length; i++) {
+      Assertions.assertEquals(columns[i].name(), 
loadTable.columns()[i].name());
+      Assertions.assertEquals(columns[i].dataType(), 
loadTable.columns()[i].dataType());
+    }
+  }
+
+  @Test
+  void testCreateTableWithPartitioning() {
+    Column[] columns = {
+      Column.of("id", Types.LongType.get(), "ID column"),
+      Column.of("created_date", Types.DateType.get(), "Creation date"),
+      Column.of("name", Types.StringType.get(), "Name column")
+    };
+
+    String partitionedTableName = 
GravitinoITUtils.genRandomName("partitioned_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
partitionedTableName);
+
+    Transform[] partitioning = new Transform[] {Transforms.day(new String[] 
{"created_date"})};
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("description", "Partitioned table test");
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Table createdTable =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            "Partitioned table",
+            properties,
+            partitioning,
+            Distributions.NONE,
+            new SortOrder[0]);
+
+    Assertions.assertEquals(partitionedTableName, createdTable.name());
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(partitionedTableName, loadedTable.name());
+    Assertions.assertEquals(3, loadedTable.columns().length);
+  }
+
+  @Test
+  void testCreateTableWithClustering() {
+    Column[] columns = {
+      Column.of("id", Types.LongType.get(), "ID column"),
+      Column.of("category", Types.StringType.get(), "Category column"),
+      Column.of("subcategory", Types.StringType.get(), "Subcategory column"),
+      Column.of("amount", Types.DecimalType.of(10, 2), "Amount column")
+    };
+
+    String clusteredTableName = 
GravitinoITUtils.genRandomName("clustered_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
clusteredTableName);
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("description", "Clustered table test");
+    properties.put("clustering_fields", "category,subcategory");
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Table createdTable =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            "Clustered table",
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new SortOrder[0]);
+
+    Assertions.assertEquals(clusteredTableName, createdTable.name());
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(clusteredTableName, loadedTable.name());
+    Assertions.assertEquals(4, loadedTable.columns().length);
+  }
+
+  @Test
+  void testAlterAndDropBigQueryTable() {
+    Column[] columns = createColumns();
+    catalog
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(schemaName, tableName), columns, table_comment, 
createProperties());
+
+    catalog
+        .asTableCatalog()
+        .alterTable(NameIdentifier.of(schemaName, tableName), 
TableChange.rename(alertTableName));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.updateComment(table_comment + "_new"));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.addColumn(new String[] {"col_4"}, 
Types.StringType.get()));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.renameColumn(new String[] {BIGQUERY_COL_NAME2}, 
"col_2_new"));
+
+    Table table = 
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
alertTableName));
+    Assertions.assertEquals(alertTableName, table.name());
+
+    Assertions.assertEquals(BIGQUERY_COL_NAME1, table.columns()[0].name());
+    Assertions.assertEquals(Types.LongType.get(), 
table.columns()[0].dataType());
+
+    Assertions.assertEquals("col_2_new", table.columns()[1].name());
+    Assertions.assertEquals(Types.DateType.get(), 
table.columns()[1].dataType());
+
+    Assertions.assertEquals(BIGQUERY_COL_NAME3, table.columns()[2].name());
+    Assertions.assertEquals(Types.StringType.get(), 
table.columns()[2].dataType());
+
+    Assertions.assertEquals("col_4", table.columns()[3].name());
+    Assertions.assertEquals(Types.StringType.get(), 
table.columns()[3].dataType());
+
+    Assertions.assertNotNull(table.auditInfo());
+
+    Assertions.assertDoesNotThrow(
+        () -> {
+          catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
alertTableName));
+        });
+  }
+
+  @Test
+  void testColumnDefaultValue() {
+    Column[] columns = createColumnsWithDefaultValue();
+    catalog
+        .asTableCatalog()
+        .createTable(NameIdentifier.of(schemaName, tableName), columns, null, 
ImmutableMap.of());
+
+    Table loadedTable =
+        catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
tableName));
+
+    Assertions.assertEquals(5, loadedTable.columns().length);
+
+    for (int i = 0; i < loadedTable.columns().length; i++) {
+      Assertions.assertNotNull(loadedTable.columns()[i]);
+      Assertions.assertEquals(columns[i].name(), 
loadedTable.columns()[i].name());
+      Assertions.assertEquals(columns[i].dataType(), 
loadedTable.columns()[i].dataType());
+    }
+  }
+
+  @Test
+  void testDropBigQueryDataset() {
+    String testSchemaName = 
GravitinoITUtils.genRandomName("bigquery_schema").toLowerCase();
+    String testTableName = 
GravitinoITUtils.genRandomName("bigquery_table").toLowerCase();
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("location", datasetLocation);
+
+    catalog.asSchemas().createSchema(testSchemaName, null, properties);
+
+    catalog
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(testSchemaName, testTableName),
+            createColumns(),
+            "Created by Gravitino client",
+            ImmutableMap.<String, String>builder().build());
+
+    Throwable excep =
+        Assertions.assertThrows(
+            RuntimeException.class, () -> 
catalog.asSchemas().dropSchema(testSchemaName, false));
+
+    String errorMessage = excep.getMessage().toLowerCase();
+    Assertions.assertTrue(
+        errorMessage.contains("cascade")
+            || errorMessage.contains("delete")
+            || errorMessage.contains("dataset")
+            || errorMessage.contains("non-empty")
+            || errorMessage.contains("not empty"));
+
+    catalog.asSchemas().loadSchema(testSchemaName);
+
+    catalog.asSchemas().dropSchema(testSchemaName, true);
+
+    SupportsSchemas schemas = catalog.asSchemas();
+    Assertions.assertThrows(NoSuchSchemaException.class, () -> 
schemas.loadSchema(testSchemaName));
+  }
+
+  @Test
+  void testBigQuerySpecialTableName() {
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+
+    String t1_name = "t112";
+    Column t1_col = Column.of(t1_name, Types.LongType.get(), "id", false, 
false, null);
+    Column[] columns = {t1_col};
+
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, t1_name);
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0]);
+
+    String t2_name = "t212";
+    Column t2_col = Column.of(t2_name, Types.LongType.get(), "id", false, 
false, null);
+    columns = new Column[] {t2_col};
+    tableIdentifier = NameIdentifier.of(schemaName, t2_name);
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0]);
+
+    String t3_name = "t_12";
+    Column t3_col = Column.of(t3_name, Types.LongType.get(), "id", false, 
false, null);
+    columns = new Column[] {t3_col};
+    tableIdentifier = NameIdentifier.of(schemaName, t3_name);
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0]);
+
+    String t4_name = "_1__";
+    Column t4_col = Column.of(t4_name, Types.LongType.get(), "id", false, 
false, null);
+    columns = new Column[] {t4_col};
+    tableIdentifier = NameIdentifier.of(schemaName, t4_name);
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0]);
+
+    Table t1 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t1_name));
+    Assertions.assertTrue(
+        Arrays.stream(t1.columns()).anyMatch(c -> Objects.equals(c.name(), 
"t112")));
+
+    Table t2 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t2_name));
+    Assertions.assertTrue(
+        Arrays.stream(t2.columns()).anyMatch(c -> Objects.equals(c.name(), 
"t212")));
+
+    Table t3 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t3_name));
+    Assertions.assertTrue(
+        Arrays.stream(t3.columns()).anyMatch(c -> Objects.equals(c.name(), 
"t_12")));
+
+    Table t4 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t4_name));
+    Assertions.assertTrue(
+        Arrays.stream(t4.columns()).anyMatch(c -> Objects.equals(c.name(), 
"_1__")));
+  }
+
+  @Test
+  void testBackQuoteTable() {
+    Column col1 = Column.of("create", Types.LongType.get(), "id", false, 
false, null);
+    Column col2 = Column.of("delete", Types.LongType.get(), "yes", false, 
false, null);
+    Column col3 = Column.of("show", Types.DateType.get(), "comment", false, 
false, null);
+    Column col4 = Column.of("status", Types.StringType.get(), "code", false, 
false, null);
+    Column[] newColumns = new Column[] {col1, col2, col3, col4};
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
"test_table_with_keywords");
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.createTable(
+                tableIdentifier,
+                newColumns,
+                table_comment,
+                Collections.emptyMap(),
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                new SortOrder[0]));
+
+    Assertions.assertDoesNotThrow(() -> 
tableCatalog.loadTable(tableIdentifier));
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier,
+                TableChange.addColumn(
+                    new String[] {"int"},
+                    Types.StringType.get(),
+                    TableChange.ColumnPosition.after("status"))));
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.deleteColumn(new String[] 
{"create"}, true)));
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.renameColumn(new String[] 
{"delete"}, "varchar")));
+
+    Assertions.assertDoesNotThrow(() -> 
tableCatalog.dropTable(tableIdentifier));
+  }
+
+  @Test
+  void testCrossRegionOperations() {
+    SupportsSchemas schemas = catalog.asSchemas();
+
+    String[] schemaNames = schemas.listSchemas();
+    Assertions.assertTrue(schemaNames.length > 0);
+
+    String testSchemaName = 
GravitinoITUtils.genRandomName("cross_region_test");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("location", "EU");
+
+    try {
+      schemas.createSchema(testSchemaName, "Cross-region test schema", 
properties);
+
+      Schema loadedSchema = schemas.loadSchema(testSchemaName);
+      Assertions.assertEquals(testSchemaName, loadedSchema.name());
+
+      schemas.dropSchema(testSchemaName, false);
+    } catch (Exception e) {
+      LOG.warn("Cross-region test failed, this might be expected: {}", 
e.getMessage());
+    }
+  }
+
+  @Test
+  void testConcurrentOperations() throws InterruptedException {
+    int numThreads = 3;
+    Thread[] threads = new Thread[numThreads];
+
+    for (int i = 0; i < numThreads; i++) {
+      final int threadId = i;
+      threads[i] =
+          new Thread(
+              () -> {
+                try {
+                  String threadTableName =
+                      GravitinoITUtils.genRandomName("concurrent_table_" + 
threadId);
+                  NameIdentifier tableIdentifier = 
NameIdentifier.of(schemaName, threadTableName);
+
+                  catalog
+                      .asTableCatalog()
+                      .createTable(
+                          tableIdentifier,
+                          createColumns(),
+                          "Concurrent test table " + threadId,
+                          createProperties(),
+                          Transforms.EMPTY_TRANSFORM,
+                          Distributions.NONE,
+                          new SortOrder[0]);
+
+                  Table loadedTable = 
catalog.asTableCatalog().loadTable(tableIdentifier);
+                  Assertions.assertEquals(threadTableName, loadedTable.name());
+
+                  catalog.asTableCatalog().dropTable(tableIdentifier);
+
+                } catch (Exception e) {
+                  LOG.error("Concurrent operation failed in thread {}", 
threadId, e);
+                  throw new RuntimeException(e);
+                }
+              });
+    }
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join(30000);
+    }
+
+    LOG.info("Concurrent operations test completed successfully");
+  }
+
+  @Test
+  void testPerformanceOperations() {
+    long startTime, endTime;
+
+    startTime = System.currentTimeMillis();
+    String[] schemas = catalog.asSchemas().listSchemas();
+    endTime = System.currentTimeMillis();
+    LOG.info("Listed {} schemas in {} ms", schemas.length, endTime - 
startTime);
+
+    startTime = System.currentTimeMillis();
+    NameIdentifier[] tables = 
catalog.asTableCatalog().listTables(Namespace.of(schemaName));
+    endTime = System.currentTimeMillis();
+    LOG.info("Listed {} tables in {} ms", tables.length, endTime - startTime);
+
+    String perfTableName = GravitinoITUtils.genRandomName("perf_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
perfTableName);
+
+    startTime = System.currentTimeMillis();
+    catalog
+        .asTableCatalog()
+        .createTable(
+            tableIdentifier,
+            createColumns(),
+            "Performance test table",
+            createProperties(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new SortOrder[0]);
+    endTime = System.currentTimeMillis();
+    LOG.info("Created table in {} ms", endTime - startTime);
+
+    startTime = System.currentTimeMillis();
+    Table loadedTable = catalog.asTableCatalog().loadTable(tableIdentifier);
+    endTime = System.currentTimeMillis();
+    LOG.info("Loaded table in {} ms", endTime - startTime);
+    Assertions.assertEquals(perfTableName, loadedTable.name());
+
+    catalog.asTableCatalog().dropTable(tableIdentifier);
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryPerformanceIT.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryPerformanceIT.java
new file mode 100644
index 0000000000..74d177ee58
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryPerformanceIT.java
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery.integration;
+
+import static 
org.apache.gravitino.catalog.bigquery.BigQueryCatalogPropertiesMetadata.PROJECT_ID;
+import static org.awaitility.Awaitility.await;
+
+import com.google.common.collect.Maps;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Performance integration tests for BigQuery catalog.
+ *
+ * <p>This test class focuses on performance testing of BigQuery catalog 
operations, including:
+ *
+ * <ul>
+ *   <li>Bulk operations performance
+ *   <li>Concurrent operations throughput
+ *   <li>Large-scale metadata operations
+ *   <li>Connection pooling efficiency
+ * </ul>
+ *
+ * <p>Requires the same environment variables as CatalogBigQueryIT plus:
+ *
+ * <ul>
+ *   <li>BIGQUERY_PERFORMANCE_TEST - Set to "true" to enable performance tests
+ * </ul>
+ */
+@TestInstance(Lifecycle.PER_CLASS)
+@EnabledIf("isPerformanceTestConfigured")
+public class CatalogBigQueryPerformanceIT extends BaseIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogBigQueryPerformanceIT.class);
+  private static final String provider = "jdbc-bigquery";
+
+  // Environment variable names
+  private static final String ENV_PROJECT_ID = "BIGQUERY_PROJECT_ID";
+  private static final String ENV_KEY_PATH = 
"BIGQUERY_SERVICE_ACCOUNT_KEY_PATH";
+  private static final String ENV_SERVICE_ACCOUNT_EMAIL = 
"BIGQUERY_SERVICE_ACCOUNT_EMAIL";
+  private static final String ENV_PERFORMANCE_TEST = 
"BIGQUERY_PERFORMANCE_TEST";
+
+  // Performance test configuration
+  private static final int BULK_OPERATION_COUNT = 20;
+  private static final int CONCURRENT_THREAD_COUNT = 5;
+  private static final int LARGE_SCALE_COUNT = 30;
+
+  // Performance thresholds
+  private static final long TABLE_LOADING_THRESHOLD_MS = 3000;
+  private static final long LISTING_THRESHOLD_MS = 10000;
+  private static final long TOTAL_OPERATION_THRESHOLD_MS = 300000;
+
+  // Test configuration
+  private String projectId;
+  private String keyPath;
+  private String serviceAccountEmail;
+
+  // Test identifiers
+  public String metalakeName = 
GravitinoITUtils.genRandomName("bigquery_perf_metalake");
+  public String catalogName = 
GravitinoITUtils.genRandomName("bigquery_perf_catalog");
+  public String schemaName = 
GravitinoITUtils.genRandomName("bigquery_perf_schema");
+
+  private GravitinoMetalake metalake;
+  protected Catalog catalog;
+
+  /**
+   * Check if BigQuery performance testing is configured.
+   *
+   * @return true if all required environment variables are set and 
performance test is enabled
+   */
+  static boolean isPerformanceTestConfigured() {
+    return StringUtils.isNotBlank(System.getenv(ENV_PROJECT_ID))
+        && StringUtils.isNotBlank(System.getenv(ENV_KEY_PATH))
+        && "true".equalsIgnoreCase(System.getenv(ENV_PERFORMANCE_TEST));
+  }
+
+  @BeforeAll
+  public void startup() {
+    // Load configuration from environment variables
+    projectId = System.getenv(ENV_PROJECT_ID);
+    keyPath = System.getenv(ENV_KEY_PATH);
+    serviceAccountEmail = System.getenv(ENV_SERVICE_ACCOUNT_EMAIL);
+
+    LOG.info("Starting BigQuery performance integration tests with project: 
{}", projectId);
+
+    createMetalake();
+    catalog = createCatalog(catalogName);
+    createSchema(catalog, schemaName);
+  }
+
+  @AfterAll
+  public void stop() {
+    try {
+      cleanupAllTestData();
+
+      if (metalake != null) {
+        try {
+          client.disableMetalake(metalakeName);
+          client.dropMetalake(metalakeName);
+          LOG.info("Dropped test metalake: {}", metalakeName);
+        } catch (Exception e) {
+          LOG.warn("Failed to drop metalake: {}, error: {}", metalakeName, 
e.getMessage());
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Error during stop cleanup: {}", e.getMessage());
+    }
+  }
+
+  @AfterEach
+  public void cleanup() {
+    cleanupTestData();
+  }
+
+  private void cleanupAllTestData() {
+    LOG.info("Starting comprehensive test data cleanup");
+
+    try {
+      if (catalog != null) {
+        cleanupTestSchemas();
+
+        cleanupTestTablesInSchema(schemaName);
+
+        dropSchemaIfExists(schemaName);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error during comprehensive cleanup: {}", e.getMessage());
+    }
+
+    LOG.info("Completed test data cleanup");
+  }
+
+  private void cleanupTestData() {
+    try {
+      if (catalog != null) {
+        cleanupTestTablesInSchema(schemaName);
+
+        cleanupTestSchemasExcludingMain();
+      }
+    } catch (Exception e) {
+      LOG.warn("Error during test data cleanup: {}", e.getMessage());
+    }
+  }
+
+  private void cleanupTestTablesInSchema(String schemaName) {
+    try {
+      NameIdentifier[] tables = 
catalog.asTableCatalog().listTables(Namespace.of(schemaName));
+      int droppedCount = 0;
+      for (NameIdentifier table : tables) {
+        if (isTestTable(table.name())) {
+          try {
+            catalog.asTableCatalog().dropTable(table);
+            droppedCount++;
+          } catch (Exception e) {
+            LOG.warn("Failed to drop test table {}.{}", schemaName, 
table.name());
+          }
+        }
+      }
+      if (droppedCount > 0) {
+        LOG.info("Dropped {} test tables from schema: {}", droppedCount, 
schemaName);
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to list tables in schema {}: {}", schemaName, 
e.getMessage());
+    }
+  }
+
+  private void cleanupTestSchemas() {
+    try {
+      String[] schemas = catalog.asSchemas().listSchemas();
+      for (String schema : schemas) {
+        if (isTestSchema(schema)) {
+          cleanupTestTablesInSchema(schema);
+
+          dropSchemaIfExists(schema);
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to list schemas during cleanup: {}", e.getMessage());
+    }
+  }
+
+  private void cleanupTestSchemasExcludingMain() {
+    try {
+      String[] schemas = catalog.asSchemas().listSchemas();
+      for (String schema : schemas) {
+        if (isTestSchema(schema) && !schema.equals(schemaName)) {
+          cleanupTestTablesInSchema(schema);
+
+          dropSchemaIfExists(schema);
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to list schemas during cleanup: {}", e.getMessage());
+    }
+  }
+
+  private void dropSchemaIfExists(String schemaName) {
+    try {
+      catalog.asSchemas().dropSchema(schemaName, true);
+      LOG.info("Dropped schema: {}", schemaName);
+    } catch (Exception e) {
+      LOG.warn("Failed to drop schema {}: {}", schemaName, e.getMessage());
+    }
+  }
+
+  private boolean isTestTable(String tableName) {
+    return tableName.startsWith("perf_test_")
+        || tableName.startsWith("bulk_test_")
+        || tableName.startsWith("test_concurrent_");
+  }
+
+  private boolean isTestSchema(String schemaName) {
+    return schemaName.startsWith("perf_test_")
+        || schemaName.startsWith("bulk_test_")
+        || schemaName.startsWith("bigquery_perf_");
+  }
+
+  private void createMetalake() {
+    try {
+      GravitinoMetalake existingMetalake = client.loadMetalake(metalakeName);
+      if (existingMetalake != null) {
+        client.disableMetalake(metalakeName);
+        client.dropMetalake(metalakeName);
+        LOG.info("Deleted existing metalake: {}", metalakeName);
+      }
+    } catch (Exception e) {
+      // Metalake doesn't exist, continue with creation
+    }
+
+    client.createMetalake(metalakeName, "Performance test metalake", 
Collections.emptyMap());
+    GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
+    Assertions.assertEquals(metalakeName, loadMetalake.name());
+
+    metalake = loadMetalake;
+    LOG.info("Created metalake: {}", metalakeName);
+  }
+
+  private Catalog createCatalog(String catalogName) {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+
+    // Required properties
+    catalogProperties.put(PROJECT_ID, projectId);
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), keyPath); // Key file 
path
+    catalogProperties.put(
+        JdbcConfig.JDBC_DRIVER.getKey(), 
"com.simba.googlebigquery.jdbc42.Driver");
+
+    // Optional properties
+    if (StringUtils.isNotBlank(serviceAccountEmail)) {
+      catalogProperties.put(JdbcConfig.USERNAME.getKey(), serviceAccountEmail);
+    }
+
+    catalogProperties.put(JdbcConfig.POOL_MIN_SIZE.getKey(), "2");
+    catalogProperties.put(JdbcConfig.POOL_MAX_SIZE.getKey(), "10");
+
+    Catalog createdCatalog =
+        metalake.createCatalog(
+            catalogName,
+            Catalog.Type.RELATIONAL,
+            provider,
+            "Performance test catalog",
+            catalogProperties);
+    Catalog loadCatalog = metalake.loadCatalog(catalogName);
+    Assertions.assertEquals(createdCatalog, loadCatalog);
+
+    LOG.info("Created catalog: {}", catalogName);
+    return loadCatalog;
+  }
+
+  private void createSchema(Catalog catalog, String schemaName) {
+    Map<String, String> prop = Maps.newHashMap();
+    prop.put("location", "US");
+    prop.put("description", "Performance test schema");
+
+    Schema createdSchema = catalog.asSchemas().createSchema(schemaName, 
"Performance test", prop);
+    Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
+    Assertions.assertEquals(createdSchema.name(), loadSchema.name());
+
+    LOG.info("Created schema: {}", schemaName);
+  }
+
+  private Column[] createTestColumns() {
+    return new Column[] {
+      Column.of("id", Types.LongType.get(), "ID column"),
+      Column.of("name", Types.StringType.get(), "Name column"),
+      Column.of("value", Types.DoubleType.get(), "Value column"),
+      Column.of("created_at", Types.TimestampType.withoutTimeZone(), "Creation 
timestamp"),
+      Column.of("is_active", Types.BooleanType.get(), "Active flag")
+    };
+  }
+
+  @Test
+  void testBulkTableCreationPerformance() {
+    LOG.info("Testing bulk table creation performance with {} tables", 
BULK_OPERATION_COUNT);
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    long startTime = System.currentTimeMillis();
+
+    // Create multiple tables
+    for (int i = 0; i < BULK_OPERATION_COUNT; i++) {
+      String tableName = String.format("bulk_test_table_%03d", i);
+      NameIdentifier tableId = NameIdentifier.of(schemaName, tableName);
+
+      try {
+        tableCatalog.createTable(
+            tableId,
+            createTestColumns(),
+            "Bulk test table " + i,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new SortOrder[0]);
+
+        if ((i + 1) % 5 == 0) {
+          LOG.info("Created {} tables", i + 1);
+        }
+      } catch (Exception e) {
+        LOG.warn("Failed to create table {}: {}", tableName, e.getMessage());
+      }
+    }
+
+    long endTime = System.currentTimeMillis();
+    long totalTime = endTime - startTime;
+    double avgTime = totalTime > 0 ? (double) totalTime / BULK_OPERATION_COUNT 
: 0;
+
+    LOG.info(
+        "Bulk table creation completed: attempted {} tables in {}ms (avg: 
{:.2f}ms per table)",
+        BULK_OPERATION_COUNT,
+        totalTime,
+        avgTime);
+
+    // Verify tables were created
+    try {
+      NameIdentifier[] tables = 
tableCatalog.listTables(Namespace.of(schemaName));
+      long bulkTestTables =
+          java.util.Arrays.stream(tables)
+              .filter(table -> table.name().startsWith("bulk_test_table_"))
+              .count();
+
+      LOG.info("Successfully created {} bulk test tables", bulkTestTables);
+
+      LOG.info(
+          "Performance metrics - Average creation time: {:.2f}ms, Total time: 
{}ms",
+          avgTime,
+          totalTime);
+
+      if (totalTime > TOTAL_OPERATION_THRESHOLD_MS) {
+        LOG.warn(
+            "Total bulk creation time {}ms exceeds threshold {}ms",
+            totalTime,
+            TOTAL_OPERATION_THRESHOLD_MS);
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to verify created tables: {}", e.getMessage());
+    }
+  }
+
+  @Test
+  void testBulkTableLoadingPerformance() {
+    LOG.info("Testing bulk table loading performance");
+
+    // First create some tables
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    int tableCount = Math.min(BULK_OPERATION_COUNT, 10);
+
+    cleanupTestTablesInSchema(schemaName);
+
+    for (int i = 0; i < tableCount; i++) {
+      String tableName = String.format("perf_test_load_table_%03d", i);
+      NameIdentifier tableId = NameIdentifier.of(schemaName, tableName);
+
+      try {
+        tableCatalog.createTable(
+            tableId,
+            createTestColumns(),
+            "Performance load test table " + i,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new SortOrder[0]);
+      } catch (Exception e) {
+        LOG.warn("Failed to create table {} for loading test: {}", tableName, 
e.getMessage());
+      }
+    }
+
+    LOG.info("Created tables for loading test");
+
+    // Wait for BigQuery operations to complete
+    await()
+        .atMost(Duration.ofSeconds(10))
+        .pollInterval(Duration.ofMillis(500))
+        .until(() -> true); // BigQuery operations are eventually consistent
+
+    // Now test loading performance
+    long startTime = System.currentTimeMillis();
+    int successfulLoads = 0;
+
+    for (int i = 0; i < tableCount; i++) {
+      String tableName = String.format("perf_test_load_table_%03d", i);
+      NameIdentifier tableId = NameIdentifier.of(schemaName, tableName);
+
+      try {
+        Table loadedTable = tableCatalog.loadTable(tableId);
+        Assertions.assertEquals(tableName, loadedTable.name());
+        successfulLoads++;
+      } catch (Exception e) {
+        LOG.warn("Failed to load table {}: {}", tableName, e.getMessage());
+      }
+    }
+
+    long endTime = System.currentTimeMillis();
+    long totalTime = endTime - startTime;
+    double avgTime = successfulLoads > 0 ? (double) totalTime / 
successfulLoads : 0;
+
+    LOG.info(
+        "Bulk table loading completed: {} successful loads out of {} attempts 
in {}ms (avg: {:.2f}ms per table)",
+        successfulLoads,
+        tableCount,
+        totalTime,
+        avgTime);
+
+    if (successfulLoads > 0) {
+      LOG.info(
+          "Performance metrics - Average loading time: {:.2f}ms, Total time: 
{}ms",
+          avgTime,
+          totalTime);
+
+      if (avgTime > TABLE_LOADING_THRESHOLD_MS) {
+        LOG.warn(
+            "Average table loading time {:.2f}ms exceeds typical threshold 
{}ms",
+            avgTime,
+            TABLE_LOADING_THRESHOLD_MS);
+      }
+    }
+
+    Assertions.assertTrue(successfulLoads > 0, "Should successfully load at 
least one table");
+  }
+
+  @Test
+  void testConcurrentTableOperationsPerformance() throws InterruptedException {
+    LOG.info(
+        "Testing concurrent table operations performance with {} threads", 
CONCURRENT_THREAD_COUNT);
+
+    ExecutorService executor = 
Executors.newFixedThreadPool(CONCURRENT_THREAD_COUNT);
+    CountDownLatch latch = new CountDownLatch(CONCURRENT_THREAD_COUNT);
+    AtomicInteger successCount = new AtomicInteger(0);
+    AtomicInteger errorCount = new AtomicInteger(0);
+    AtomicLong totalTime = new AtomicLong(0);
+
+    long startTime = System.currentTimeMillis();
+
+    for (int i = 0; i < CONCURRENT_THREAD_COUNT; i++) {
+      final int threadId = i;
+      executor.submit(
+          () -> {
+            try {
+              long threadStartTime = System.currentTimeMillis();
+
+              String tableName = String.format("perf_test_concurrent_%03d", 
threadId);
+              NameIdentifier tableId = NameIdentifier.of(schemaName, 
tableName);
+
+              try {
+                catalog.asTableCatalog().loadTable(tableId);
+                catalog.asTableCatalog().dropTable(tableId);
+              } catch (Exception e) {
+                // Table doesn't exist, that's expected
+              }
+
+              // Create table
+              catalog
+                  .asTableCatalog()
+                  .createTable(
+                      tableId,
+                      createTestColumns(),
+                      "Concurrent test table " + threadId,
+                      Collections.emptyMap(),
+                      Transforms.EMPTY_TRANSFORM,
+                      Distributions.NONE,
+                      new SortOrder[0]);
+
+              // Load table
+              Table loadedTable = catalog.asTableCatalog().loadTable(tableId);
+              Assertions.assertEquals(tableName, loadedTable.name());
+
+              long threadEndTime = System.currentTimeMillis();
+              totalTime.addAndGet(threadEndTime - threadStartTime);
+              successCount.incrementAndGet();
+
+            } catch (Exception e) {
+              LOG.error("Concurrent operation failed in thread {}: {}", 
threadId, e.getMessage());
+              errorCount.incrementAndGet();
+            } finally {
+              latch.countDown();
+            }
+          });
+    }
+
+    // Wait for all threads to complete
+    boolean completed = latch.await(10, TimeUnit.MINUTES);
+    executor.shutdown();
+
+    long endTime = System.currentTimeMillis();
+    long wallClockTime = endTime - startTime;
+
+    LOG.info(
+        "Concurrent operations completed: {} successful, {} errors in {}ms 
wall-clock time",
+        successCount.get(),
+        errorCount.get(),
+        wallClockTime);
+
+    if (CONCURRENT_THREAD_COUNT > 0) {
+      double avgThreadTime = totalTime.get() / (double) 
CONCURRENT_THREAD_COUNT;
+      LOG.info("Average thread execution time: {:.2f}ms", avgThreadTime);
+    }
+
+    Assertions.assertTrue(completed, "All concurrent operations should 
complete within timeout");
+    Assertions.assertTrue(
+        successCount.get() > 0, "At least some concurrent operations should 
succeed");
+  }
+
+  @Test
+  void testLargeScaleSchemaListingPerformance() {
+    LOG.info("Testing large-scale schema listing performance");
+
+    // Create multiple schemas
+    int schemaCount = Math.min(LARGE_SCALE_COUNT, 10);
+    int createdCount = 0;
+
+    for (int i = 0; i < schemaCount; i++) {
+      String testSchemaName = String.format("perf_test_schema_%03d", i);
+
+      try {
+        catalog.asSchemas().loadSchema(testSchemaName);
+        continue;
+      } catch (Exception e) {
+        // Schema doesn't exist, continue to create
+      }
+
+      Map<String, String> properties = Maps.newHashMap();
+      properties.put("location", "US");
+      properties.put("description", "Performance test schema " + i);
+
+      try {
+        catalog.asSchemas().createSchema(testSchemaName, "Perf test schema", 
properties);
+        createdCount++;
+        LOG.info("Created schema {}", testSchemaName);
+      } catch (Exception e) {
+        LOG.warn("Failed to create schema {}: {}", testSchemaName, 
e.getMessage());
+      }
+    }
+
+    LOG.info("Created {} schemas for listing test", createdCount);
+
+    // Wait for BigQuery operations to complete
+    await()
+        .atMost(Duration.ofSeconds(15))
+        .pollInterval(Duration.ofMillis(500))
+        .until(() -> true); // BigQuery operations are eventually consistent
+
+    // Test listing performance multiple times
+    int iterations = 3;
+    long totalTime = 0;
+
+    for (int i = 0; i < iterations; i++) {
+      long startTime = System.currentTimeMillis();
+      String[] schemas = catalog.asSchemas().listSchemas();
+      long endTime = System.currentTimeMillis();
+
+      totalTime += (endTime - startTime);
+
+      if (i == 0) {
+        LOG.info("First schema listing returned {} schemas", schemas.length);
+      }
+    }
+
+    double avgTime = (double) totalTime / iterations;
+    LOG.info("Schema listing performance: {} iterations, avg time: {:.2f}ms", 
iterations, avgTime);
+
+    if (avgTime > LISTING_THRESHOLD_MS) {
+      LOG.warn(
+          "Average schema listing time {:.2f}ms exceeds typical threshold 
{}ms",
+          avgTime,
+          LISTING_THRESHOLD_MS);
+    }
+  }
+
+  @Test
+  void testLargeScaleTableListingPerformance() {
+    LOG.info("Testing large-scale table listing performance");
+
+    // Create multiple tables
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    int tableCount = Math.min(LARGE_SCALE_COUNT, 20);
+
+    cleanupTestTablesInSchema(schemaName);
+
+    long creationStartTime = System.currentTimeMillis();
+    int createdCount = 0;
+
+    for (int i = 0; i < tableCount; i++) {
+      String tableName = String.format("perf_test_list_table_%03d", i);
+      NameIdentifier tableId = NameIdentifier.of(schemaName, tableName);
+
+      try {
+        tableCatalog.createTable(
+            tableId,
+            createTestColumns(),
+            "Performance list test table " + i,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new SortOrder[0]);
+        createdCount++;
+
+        if ((i + 1) % 5 == 0) {
+          LOG.info("Created {} tables for listing test", i + 1);
+        }
+      } catch (Exception e) {
+        LOG.warn("Failed to create table {}: {}", tableName, e.getMessage());
+      }
+    }
+
+    long creationEndTime = System.currentTimeMillis();
+    LOG.info(
+        "Created {} tables for listing test in {}ms",
+        createdCount,
+        creationEndTime - creationStartTime);
+
+    // Wait for BigQuery operations to complete
+    await()
+        .atMost(Duration.ofSeconds(15))
+        .pollInterval(Duration.ofMillis(500))
+        .until(() -> true); // BigQuery operations are eventually consistent
+
+    // Test listing performance multiple times
+    int iterations = 3;
+    long totalTime = 0;
+
+    for (int i = 0; i < iterations; i++) {
+      long startTime = System.currentTimeMillis();
+      NameIdentifier[] tables = 
tableCatalog.listTables(Namespace.of(schemaName));
+      long endTime = System.currentTimeMillis();
+
+      totalTime += (endTime - startTime);
+
+      if (i == 0) {
+        LOG.info("First table listing returned {} tables", tables.length);
+      }
+    }
+
+    double avgTime = (double) totalTime / iterations;
+    LOG.info("Table listing performance: {} iterations, avg time: {:.2f}ms", 
iterations, avgTime);
+
+    if (avgTime > LISTING_THRESHOLD_MS) {
+      LOG.warn(
+          "Average table listing time {:.2f}ms exceeds typical threshold {}ms",
+          avgTime,
+          LISTING_THRESHOLD_MS);
+    }
+  }
+
+  @Test
+  void testConnectionPoolingEfficiency() {
+    LOG.info("Testing connection pooling efficiency");
+
+    // Perform rapid sequential operations to test connection reuse
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    int operationCount = 10;
+
+    long startTime = System.currentTimeMillis();
+
+    for (int i = 0; i < operationCount; i++) {
+      // Mix of different operations to test connection pooling
+      if (i % 3 == 0) {
+        // Schema listing
+        try {
+          catalog.asSchemas().listSchemas();
+        } catch (Exception e) {
+          LOG.warn("Schema listing operation failed: {}", e.getMessage());
+        }
+      } else if (i % 3 == 1) {
+        // Table listing
+        try {
+          tableCatalog.listTables(Namespace.of(schemaName));
+        } catch (Exception e) {
+          LOG.warn("Table listing operation failed: {}", e.getMessage());
+        }
+      } else {
+        // Schema loading
+        try {
+          catalog.asSchemas().loadSchema(schemaName);
+        } catch (Exception e) {
+          LOG.warn("Schema loading operation failed: {}", e.getMessage());
+        }
+      }
+    }
+
+    long endTime = System.currentTimeMillis();
+    long totalTime = endTime - startTime;
+    double avgTime = (double) totalTime / operationCount;
+
+    LOG.info(
+        "Connection pooling test: {} operations in {}ms (avg: {:.2f}ms per 
operation)",
+        operationCount,
+        totalTime,
+        avgTime);
+
+    LOG.info("Connection pooling performance - Total: {}ms, Average: 
{:.2f}ms", totalTime, avgTime);
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/resources/log4j2.properties 
b/catalogs/catalog-jdbc-bigquery/src/test/resources/log4j2.properties
new file mode 100644
index 0000000000..82dc473e7e
--- /dev/null
+++ b/catalogs/catalog-jdbc-bigquery/src/test/resources/log4j2.properties
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Log4j2 configuration for BigQuery catalog tests
+
+# Root logger level
+rootLogger.level = INFO
+rootLogger.appenderRefs = stdout
+rootLogger.appenderRef.stdout.ref = STDOUT
+
+# Console appender
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} [%t] %-5level 
%logger{36} - %msg%n
+
+# BigQuery catalog specific logging
+logger.bigquery.name = org.apache.gravitino.catalog.bigquery
+logger.bigquery.level = DEBUG
+logger.bigquery.additivity = false
+logger.bigquery.appenderRefs = stdout
+logger.bigquery.appenderRef.stdout.ref = STDOUT
+
+# Integration test logging
+logger.integration.name = 
org.apache.gravitino.catalog.bigquery.integration.test
+logger.integration.level = INFO
+logger.integration.additivity = false
+logger.integration.appenderRefs = stdout
+logger.integration.appenderRef.stdout.ref = STDOUT
+
+# Google API client logging (reduce noise)
+logger.google.name = com.google
+logger.google.level = WARN
+
+# HTTP transport logging (reduce noise)
+logger.http.name = com.google.api.client.http
+logger.http.level = WARN
+
+# BigQuery API logging
+logger.bigqueryapi.name = com.google.api.services.bigquery
+logger.bigqueryapi.level = INFO
\ No newline at end of file
diff --git a/docs/jdbc-bigquery-catalog.md b/docs/jdbc-bigquery-catalog.md
new file mode 100644
index 0000000000..c2553b5234
--- /dev/null
+++ b/docs/jdbc-bigquery-catalog.md
@@ -0,0 +1,317 @@
+---
+title: "BigQuery catalog"
+slug: /jdbc-bigquery-catalog
+keywords:
+- jdbc
+- BigQuery
+- metadata
+license: "This software is licensed under the Apache License version 2."
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+## Introduction
+
+Apache Gravitino provides the ability to manage Google BigQuery metadata 
through JDBC connection.
+
+:::caution
+Gravitino saves some system information in schema and table comment, like 
`(From Gravitino, DO NOT EDIT: gravitino.v1.uid1078334182909406185)`, please 
don't change or remove this message.
+:::
+
+## Catalog
+
+### Catalog capabilities
+
+- Gravitino catalog corresponds to the BigQuery project.
+- Supports metadata management of Google BigQuery.
+- Supports DDL operations for BigQuery datasets and tables.
+- Supports table partitioning and clustering.
+- Supports [column default 
value](./manage-relational-metadata-using-gravitino.md#table-column-default-value).
+
+### Catalog properties
+
+You can pass to a BigQuery data source any property that isn't defined by 
Gravitino by adding `gravitino.bypass.` prefix as a catalog property. For 
example, catalog property `gravitino.bypass.maxWaitMillis` will pass 
`maxWaitMillis` to the data source property.
+
+Check the relevant data source configuration in [data source 
properties](https://commons.apache.org/proper/commons-dbcp/configuration.html)
+
+If you use a JDBC catalog, you must provide `jdbc-url`, `jdbc-driver`, 
`jdbc-user` and `jdbc-password` to catalog properties.
+Besides the [common catalog 
properties](./gravitino-server-config.md#apache-gravitino-catalog-properties-configuration),
 the BigQuery catalog has the following properties:
+
+| Configuration item      | Description                                        
                                               | Default value                  
                              | Required | Since version |
+|-------------------------|---------------------------------------------------------------------------------------------------|--------------------------------------------------------------|----------|---------------|
+| `project-id`            | Google Cloud Project ID                            
                                               | (none)                         
                              | Yes      | 1.2.0         |
+| `jdbc-url`              | JDBC connection URL, can be auto-generated or 
manually specified                                  | 
`jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443` | Yes      | 1.2.0 
        |
+| `jdbc-driver`           | JDBC driver class name                             
                                               | 
`com.simba.googlebigquery.jdbc42.Driver`                     | Yes      | 1.2.0 
        |
+| `jdbc-user`             | Service account email                              
                                               | (none)                         
                              | Yes      | 1.2.0         |
+| `jdbc-password`         | Service account key file path                      
                                               | (none)                         
                              | Yes      | 1.2.0         |
+| `jdbc.pool.min-size`    | The minimum number of connections in the pool. `2` 
by default.                                    | `2`                            
                              | No       | 1.2.0         |
+| `jdbc.pool.max-size`    | The maximum number of connections in the pool. 
`10` by default.                                   | `10`                       
                                  | No       | 1.2.0         |
+| `jdbc.pool.max-wait-ms` | The maximum Duration that the pool will wait for a 
connection to be returned. `30000` by default. | `30000`                        
                              | No       | 1.2.0         |
+| `proxy-host`            | Proxy server hostname or IP address                
                                               | (none)                         
                              | No       | 1.2.0         |
+| `proxy-port`            | Proxy server port number                           
                                               | (none)                         
                              | No       | 1.2.0         |
+| `proxy-username`        | Proxy authentication username                      
                                               | (none)                         
                              | No       | 1.2.0         |
+| `proxy-password`        | Proxy authentication password                      
                                               | (none)                         
                              | No       | 1.2.0         |
+
+:::note
+When using a proxy that requires credentials (`proxy-username` and 
`proxy-password`), the following JVM arguments must be used:
+```
+-Djdk.http.auth.tunneling.disabledSchemes=
+-Djdk.http.auth.proxying.disabledSchemes=
+```
+These arguments disable the default security restrictions on HTTP 
authentication schemes for tunneling and proxying, allowing authentication to 
work properly through the proxy.
+:::
+
+
+
+### Catalog operations
+
+Refer to [Manage Relational Metadata Using 
Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) 
for more details.
+
+## Schema
+
+### Schema capabilities
+
+- Gravitino's schema concept corresponds to the BigQuery dataset.
+- Supports creating schema with properties like location, description, labels, 
etc.
+- Supports dropping schema.
+- Supports cascade dropping schema.
+
+### Schema properties
+
+| Property Name                        | Type                          | 
Description                                                             | 
Supported |
+|--------------------------------------|-------------------------------|-------------------------------------------------------------------------|-----------|
+| `default_kms_key_name`               | STRING                        | 
Default Cloud KMS encryption key (not verified)                         | Yes   
    |
+| `default_partition_expiration_days`  | FLOAT64                       | 
Default partition expiration days                                       | Yes   
    |
+| `default_rounding_mode`              | STRING                        | 
Default rounding mode: `ROUND_HALF_AWAY_FROM_ZERO` or `ROUND_HALF_EVEN` | Yes   
    |
+| `default_table_expiration_days`      | FLOAT64                       | 
Default table expiration days                                           | Yes   
    |
+| `failover_reservation`               | STRING                        | 
Failover reservation                                                    | No    
    |
+| `friendly_name`                      | STRING                        | 
Friendly name                                                           | Yes   
    |
+| `is_case_insensitive`                | BOOL                          | Case 
insensitive                                                        | Yes       |
+| `is_primary`                         | BOOLEAN                       | 
Whether it's the primary replica                                        | No    
    |
+| `primary_replica`                    | STRING                        | 
Primary replica name                                                    | No    
    |
+| `storage_billing_model`              | STRING                        | 
Storage billing model: `PHYSICAL` or `LOGICAL`                          | Yes   
    |
+| `tags`                               | ARRAY<STRUCT<STRING, STRING>> | IAM 
tags array                                                          | Yes       
|
+| `collate`                            | STRING                        | 
Collation schema                                                        | No    
    |
+
+### Schema operations
+
+Refer to [Manage Relational Metadata Using 
Gravitino](./manage-relational-metadata-using-gravitino.md#schema-operations) 
for more details.
+
+## Table
+
+### Table capabilities
+
+- Gravitino's table concept corresponds to the BigQuery table.
+- Supports DDL operations for BigQuery tables.
+- Supports table partitioning and clustering.
+- Supports [column default 
value](./manage-relational-metadata-using-gravitino.md#table-column-default-value).
+
+### Table column types
+
+**Basic Types Supported by Web UI**:
+
+| Gravitino Type | BigQuery Type |
+|----------------|---------------|
+| `Binary`       | `BYTES`       |
+| `Boolean`      | `BOOL`        |
+| `Byte`         | `INT64`       |
+| `Char`         | `STRING`      |
+| `Date`         | `DATE`        |
+| `Decimal`      | `NUMERIC`     |
+| `Double`       | `FLOAT64`     |
+| `Float`        | `FLOAT64`     |
+| `Integer`      | `INT64`       |
+| `Long`         | `INT64`       |
+| `Short`        | `INT64`       |
+| `String`       | `STRING`      |
+| `Time`         | `TIME`        |
+| `Timestamp`    | `DATETIME`    |
+| `Timestamp_tz` | `TIMESTAMP`   |
+| `VarChar`      | `STRING`      |
+
+**Complex Types Supported by API** (Not supported in Web UI):
+
+| Gravitino Type| BigQuery Type |
+|---------------|---------------|
+| `ARRAY<T>`    | `ARRAY<T>`    |
+| `STRUCT<...>` | `STRUCT<...>` |
+| `Geography`   | `GEOGRAPHY`   |
+| `Json`        | `JSON`        |
+| `Range<T>`    | `RANGE<T>`    |
+
+:::info
+BigQuery doesn't support Gravitino `Fixed` `IntervalDay` `IntervalYear` 
`Union` `UUID` type.
+Meanwhile, the data types other than listed above are mapped to Gravitino 
**[External 
Type](./manage-relational-metadata-using-gravitino.md#external-type)** that 
represents an unresolvable data type.
+
+Unsupported types will be optimized in future versions. The following types 
are recommended to use `string` type as a workaround:
+- INTERVAL
+- BIGNUMERIC (Note: BIGNUMERIC precision above 38 is truncated to fit 
Gravitino's DecimalType limits)
+:::
+
+### Table properties
+
+| Property Name                   | Description                               
| Type    | Example Value                                      | Supported |
+|---------------------------------|-------------------------------------------|---------|----------------------------------------------------|-----------|
+| `description`                   | Table description                         
| String  | `"Customer data table"`                            | Yes       |
+| `friendly_name`                 | Friendly name                             
| String  | `"Customer Data"`                                  | Yes       |
+| `expiration_timestamp`          | Table expiration timestamp                
| String  | `"2025-12-31T23:59:59Z"`                           | Yes       |
+| `partition_expiration_days`     | Partition expiration days                 
| Float64 | `"365.5"`                                          | Yes       |
+| `require_partition_filter`      | Whether partition filter is required      
| Boolean | `"true"`                                           | Yes       |
+| `clustering_fields`             | Clustering fields, comma-separated, max 4 
| String  | `"country,city,category"`                          | Yes       |
+| `labels`                        | Table labels, JSON array format           
| String  | `"[{\"env\":\"prod\"}, {\"team\":\"data\"}]"`      | Yes       |
+| `kms_key_name`                  | Cloud KMS encryption key                  
| String  | `"projects/p/locations/l/keyRings/r/cryptoKeys/k"` | Yes       |
+| `default_rounding_mode`         | Default rounding mode                     
| String  | `"ROUND_HALF_EVEN"`                                | Yes       |
+| `enable_change_history`         | Enable change history                     
| Boolean | `"true"`                                           | Yes       |
+| `enable_fine_grained_mutations` | Enable fine-grained mutations             
| Boolean | `"true"`                                           | Yes       |
+| `tags`                          | IAM tags                                  
| String  | `"[{\"key\":\"value\"}]"`                          | Yes       |
+| `max_staleness`                 | Maximum staleness                         
| String  | `"INTERVAL 4 HOUR"`                                | No        |
+| `storage_uri`                   | Storage URI for managed tables            
| String  | `"gs://my-bucket/tables/"`                         | No        |
+| `file_format`                   | File format for managed tables            
| String  | `"PARQUET"`                                        | No        |
+| `table_format`                  | Table format for managed tables           
| String  | `"ICEBERG"`                                        | No        |
+
+### Table partitioning
+
+The BigQuery catalog supports partitioned tables. Users can create partitioned 
tables with specific partitioning attributes through the API.
+
+| Gravitino Partition Transform   | BigQuery Partition Strategy                
                                                        |
+|---------------------------------|----------------------------------------------------------------------------------------------------|
+| `Transforms.day("column")`      | Daily partitioning based on 
DATE/TIMESTAMP/DATETIME column                                         |
+| `Transforms.hour("column")`     | Hourly partitioning based on 
TIMESTAMP/DATETIME column                                             |
+| `Transforms.month("column")`    | Monthly partitioning based on 
DATE/TIMESTAMP/DATETIME column                                       |
+| `Transforms.year("column")`     | Yearly partitioning based on 
DATE/TIMESTAMP/DATETIME column                                        |
+| `Transforms.identity("column")` | Direct partitioning (only 
DATE/TIMESTAMP/DATETIME, integer range partitioning not yet implemented) |
+
+:::note
+Integer range partitioning is not implemented in the current version.
+:::
+
+### Table operations
+
+Refer to [Manage Relational Metadata Using 
Gravitino](./manage-relational-metadata-using-gravitino.md#table-operations) 
for more details.
+
+#### Create table examples
+
+<Tabs groupId='language' queryString>
+<TabItem value="json" label="Json">
+
+```json
+{
+  "name": "user_events",
+  "comment": "User events table with partitioning and clustering",
+  "columns": [
+    {
+      "name": "user_id",
+      "type": "string",
+      "nullable": false,
+      "comment": "User ID"
+    },
+    {
+      "name": "event_timestamp",
+      "type": "timestamp_tz",
+      "nullable": false,
+      "comment": "Event timestamp"
+    },
+    {
+      "name": "event_type",
+      "type": "string",
+      "nullable": true,
+      "comment": "Event type"
+    },
+    {
+      "name": "event_data",
+      "type": "json",
+      "nullable": true,
+      "comment": "Event data in JSON format"
+    }
+  ],
+  "partitioning": [
+    {
+      "strategy": "day",
+      "fieldName": ["event_timestamp"]
+    }
+  ],
+  "properties": {
+    "clustering_fields": "user_id,event_type",
+    "partition_expiration_days": "365",
+    "require_partition_filter": "true"
+  }
+}
+```
+
+</TabItem>
+<TabItem value="java" label="Java">
+
+```java
+Column[] columns = new Column[] {
+    Column.of("user_id", Types.StringType.get(), "User ID", false, false, 
null),
+    Column.of("event_timestamp", Types.TimestampType.withTimeZone(), "Event 
timestamp", false, false, null),
+    Column.of("event_type", Types.StringType.get(), "Event type", true, false, 
null),
+    Column.of("event_data", Types.ExternalType.of("JSON"), "Event data in JSON 
format", true, false, null)
+};
+
+Transform[] partitioning = new Transform[] {
+    Transforms.day("event_timestamp")
+};
+
+Map<String, String> properties = ImmutableMap.of(
+    "clustering_fields", "user_id,event_type",
+    "partition_expiration_days", "365",
+    "require_partition_filter", "true"
+);
+
+Table table = catalog.asTableCatalog()
+    .createTable(
+        NameIdentifier.of("schema_name", "user_events"),
+        columns,
+        "User events table with partitioning and clustering",
+        properties,
+        partitioning
+    );
+```
+
+</TabItem>
+</Tabs>
+
+#### Alter table operations
+
+Gravitino supports these table alteration operations:
+
+- `RenameTable`
+- `UpdateComment`
+- `AddColumn`
+- `DeleteColumn`
+- `RenameColumn`
+- `UpdateColumnType`
+- `UpdateColumnPosition`
+- `UpdateColumnNullability`
+- `UpdateColumnComment`
+- `UpdateColumnDefaultValue`
+- `SetProperty`
+
+:::info
+- You cannot submit the `RenameTable` operation at the same time as other 
operations.
+- If you update a nullability column to non-nullability, there may be 
compatibility issues.
+:::
+
+## Limitations and Considerations
+
+### Current Limitations
+
+1. **Feature Limitations**
+   - Integer range partitioning not implemented
+   - External tables, views, table clones not supported
+   - Dataset properties like `failover_reservation`, `is_primary`, 
`primary_replica`, `collate` not supported
+   - Web UI does not support table partitioning and clustering (API only)
+   - Web UI does not support complex data types (ARRAY, STRUCT, GEOGRAPHY, 
JSON) (API only)
+   - BIGNUMERIC precision above (38,9) has precision loss; INTERVAL type 
currently not supported
+   - Unsupported data types display as "external" in Web UI
+
+2. **Performance Considerations**
+   - Table metadata loading uses JDBC which may be slower than native API calls
+
+### Recommended Usage
+
+- **Simple Scenarios**: Use Web UI to create basic tables and schemas
+- **Advanced Scenarios**: Use API for complex configurations (supports 
BigQuery native data types for table creation)
+- **Production Environment**: Recommend using API for batch operations
\ No newline at end of file

Reply via email to