This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch internal-main
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 0605097d9910de45c27186d574756d94663bd25b
Author: geyanggang <[email protected]>
AuthorDate: Tue Jan 6 19:04:33 2026 +0800

    [#93]feat(bigquery-catalog): Add table operators for BigQuery catalog. 
(#102)
    
    * Add table operators for BQ catalog.
    
    * Add table operators for BQ catalog.
    
    * Fix and improve.
    
    * Fix and improve.
    
    * Fix and improve.
    
    * fix bugs.
---
 catalogs/catalog-jdbc-bigquery/build.gradle.kts    |    8 +-
 .../bigquery/BigQueryTablePropertiesMetadata.java  |  221 +++-
 .../BigQueryColumnDefaultValueConverter.java       |  107 +-
 .../converter/BigQueryExceptionConverter.java      |   81 +-
 .../bigquery/converter/BigQueryTypeConverter.java  |  175 +++-
 .../operation/BigQueryTableOperations.java         | 1057 +++++++++++++++++++-
 .../catalog/bigquery/utils/BigQueryUtils.java      |   73 ++
 .../TestBigQueryTablePropertiesMetadata.java       |  180 ++++
 .../TestBigQueryColumnDefaultValueConverter.java   |  192 ++++
 .../converter/TestBigQueryTypeConverter.java       |  284 ++++++
 .../operation/TestBigQuerySqlGeneration.java       |  192 ++++
 .../operation/TestBigQueryTableOperations.java     |  598 +++++++++++
 12 files changed, 3136 insertions(+), 32 deletions(-)

diff --git a/catalogs/catalog-jdbc-bigquery/build.gradle.kts 
b/catalogs/catalog-jdbc-bigquery/build.gradle.kts
index efdaae56cd..0311d0f5ff 100644
--- a/catalogs/catalog-jdbc-bigquery/build.gradle.kts
+++ b/catalogs/catalog-jdbc-bigquery/build.gradle.kts
@@ -55,6 +55,7 @@ val extractSimbaDriver by tasks.registering(Copy::class) {
   from(zipTree(simbaZipFile))
   into(simbaExtractDir)
   include("**/*.jar")
+  exclude("**/jackson-*.jar")
   exclude("**/src/", "**/doc/", "**/samples/", "**/legal/")
 
   rename("GoogleBigQueryJDBC42.jar", "GoogleBigQueryJDBC42-simba.jar")
@@ -110,6 +111,7 @@ dependencies {
   implementation(libs.commons.lang3)
   implementation(libs.guava)
   implementation(libs.commons.compress)
+  implementation(libs.jackson.databind)
 
   testImplementation(project(":catalogs:catalog-jdbc-common", "testArtifacts"))
   testImplementation(project(":clients:client-java"))
@@ -122,7 +124,11 @@ dependencies {
   testImplementation(libs.testcontainers)
   testImplementation(libs.mockito.core)
 
-  val simbaJdbcDriver = files(simbaExtractDir.asFileTree.matching { 
include("*.jar") })
+  val simbaJdbcDriver = files(
+    simbaExtractDir.asFileTree.matching {
+      include("*.jar")
+    }
+  )
   implementation(simbaJdbcDriver)
 
   testRuntimeOnly(libs.junit.jupiter.engine)
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryTablePropertiesMetadata.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryTablePropertiesMetadata.java
index 2222ec1cbd..24684d017f 100644
--- 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryTablePropertiesMetadata.java
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/BigQueryTablePropertiesMetadata.java
@@ -18,16 +18,231 @@
  */
 package org.apache.gravitino.catalog.bigquery;
 
-import com.google.common.collect.ImmutableMap;
+import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.gravitino.catalog.jdbc.JdbcTablePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
-/** BigQuery table properties metadata. */
+/**
+ * BigQuery table properties metadata.
+ *
+ * <p>Defines BigQuery-specific table properties such as partition expiration, 
clustering fields,
+ * table expiration, and require partition filter. All properties are mutable 
and can be modified
+ * after table creation using ALTER TABLE SET OPTIONS.
+ */
 public class BigQueryTablePropertiesMetadata extends 
JdbcTablePropertiesMetadata {
 
+  // BigQuery table specific property keys
+  public static final String PARTITION_EXPIRATION_DAYS = 
"partition_expiration_days";
+  public static final String REQUIRE_PARTITION_FILTER = 
"require_partition_filter";
+  public static final String CLUSTERING_FIELDS = "clustering_fields";
+  public static final String EXPIRATION_TIMESTAMP = "expiration_timestamp";
+  public static final String FRIENDLY_NAME = "friendly_name";
+  public static final String DESCRIPTION = "description";
+  public static final String LABELS = "labels";
+
+  // Additional BigQuery table properties
+  public static final String KMS_KEY_NAME = "kms_key_name";
+  public static final String DEFAULT_ROUNDING_MODE = "default_rounding_mode";
+  public static final String ENABLE_CHANGE_HISTORY = "enable_change_history";
+  public static final String MAX_STALENESS = "max_staleness";
+  public static final String ENABLE_FINE_GRAINED_MUTATIONS = 
"enable_fine_grained_mutations";
+
+  // Managed table properties (Preview)
+  public static final String STORAGE_URI = "storage_uri";
+  public static final String FILE_FORMAT = "file_format";
+  public static final String TABLE_FORMAT = "table_format";
+
+  // IAM tags
+  public static final String TAGS = "tags";
+
+  private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA =
+      createPropertiesMetadata();
+
+  /**
+   * Creates the properties metadata map for BigQuery tables.
+   *
+   * @return immutable map of property entries
+   */
+  private static Map<String, PropertyEntry<?>> createPropertiesMetadata() {
+    Map<String, PropertyEntry<?>> map = new HashMap<>();
+
+    // Table description
+    map.put(
+        DESCRIPTION,
+        stringOptionalPropertyEntry(
+            DESCRIPTION, "Table description for documentation purposes", 
false, null, false));
+
+    // Friendly name
+    map.put(
+        FRIENDLY_NAME,
+        stringOptionalPropertyEntry(
+            FRIENDLY_NAME,
+            "Friendly name for the table displayed in BigQuery UI",
+            false,
+            null,
+            false));
+
+    // Partition expiration days
+    map.put(
+        PARTITION_EXPIRATION_DAYS,
+        stringOptionalPropertyEntry(
+            PARTITION_EXPIRATION_DAYS,
+            "Number of days after which partitions expire and are 
automatically deleted. "
+                + "Only applies to partitioned tables. Supports fractional 
days (e.g., 30.5).",
+            false,
+            null,
+            false));
+
+    // Require partition filter
+    map.put(
+        REQUIRE_PARTITION_FILTER,
+        stringOptionalPropertyEntry(
+            REQUIRE_PARTITION_FILTER,
+            "Whether queries on this table must include a partition filter 
(true/false). "
+                + "Helps prevent accidentally querying large amounts of data. 
Only applies to partitioned tables.",
+            false,
+            null,
+            false));
+
+    // Clustering fields (comma-separated, max 4)
+    map.put(
+        CLUSTERING_FIELDS,
+        stringOptionalPropertyEntry(
+            CLUSTERING_FIELDS,
+            "Comma-separated list of clustering field names (maximum 4 
fields). "
+                + "Clustering improves query performance for filtered and 
aggregated queries.",
+            false,
+            null,
+            false));
+
+    // Table expiration timestamp
+    map.put(
+        EXPIRATION_TIMESTAMP,
+        stringOptionalPropertyEntry(
+            EXPIRATION_TIMESTAMP,
+            "Timestamp when the table expires and is automatically deleted 
(ISO 8601 format). "
+                + "Example: '2024-12-31T23:59:59Z'",
+            false,
+            null,
+            false));
+
+    // Labels in JSON format
+    map.put(
+        LABELS,
+        stringOptionalPropertyEntry(
+            LABELS,
+            "Table labels in JSON array format: [{\"key\":\"value\"}]. "
+                + "Used for organizing and filtering tables.",
+            false,
+            null,
+            false));
+
+    // KMS key name for encryption
+    map.put(
+        KMS_KEY_NAME,
+        stringOptionalPropertyEntry(
+            KMS_KEY_NAME,
+            "Cloud KMS key name for table encryption. "
+                + "Format: 
projects/PROJECT_ID/locations/LOCATION/keyRings/KEYRING/cryptoKeys/KEY",
+            false,
+            null,
+            false));
+
+    // Default rounding mode for NUMERIC/BIGNUMERIC columns
+    map.put(
+        DEFAULT_ROUNDING_MODE,
+        stringOptionalPropertyEntry(
+            DEFAULT_ROUNDING_MODE,
+            "Default rounding mode for NUMERIC and BIGNUMERIC columns. "
+                + "Supported values: ROUND_HALF_AWAY_FROM_ZERO, 
ROUND_HALF_EVEN",
+            false,
+            null,
+            false));
+
+    // Enable change history (Preview)
+    map.put(
+        ENABLE_CHANGE_HISTORY,
+        stringOptionalPropertyEntry(
+            ENABLE_CHANGE_HISTORY,
+            "Enable change history capture on the table for use with CHANGES 
function (true/false). "
+                + "This is a preview feature and may incur additional costs.",
+            false,
+            null,
+            false));
+
+    // Max staleness for change data capture
+    map.put(
+        MAX_STALENESS,
+        stringOptionalPropertyEntry(
+            MAX_STALENESS,
+            "Maximum interval behind current time for reading stale data. "
+                + "Format: INTERVAL \"4:0:0\" HOUR TO SECOND",
+            false,
+            null,
+            false));
+
+    // Enable fine-grained mutations (Preview)
+    map.put(
+        ENABLE_FINE_GRAINED_MUTATIONS,
+        stringOptionalPropertyEntry(
+            ENABLE_FINE_GRAINED_MUTATIONS,
+            "Enable fine-grained DML optimization on the table (true/false). "
+                + "This is a preview feature.",
+            false,
+            null,
+            false));
+
+    // Managed table properties (Preview)
+    map.put(
+        STORAGE_URI,
+        stringOptionalPropertyEntry(
+            STORAGE_URI,
+            "Fully qualified location prefix for external folder where data is 
stored. "
+                + "Required for managed tables. Format: 
gs://BUCKET_DIRECTORY/TABLE_DIRECTORY/",
+            false,
+            null,
+            false));
+
+    map.put(
+        FILE_FORMAT,
+        stringOptionalPropertyEntry(
+            FILE_FORMAT,
+            "Open-source file format for table data storage. "
+                + "Only PARQUET is supported for managed tables.",
+            false,
+            "PARQUET",
+            false));
+
+    map.put(
+        TABLE_FORMAT,
+        stringOptionalPropertyEntry(
+            TABLE_FORMAT,
+            "Open table format for metadata-only snapshots. "
+                + "Only ICEBERG is supported for managed tables.",
+            false,
+            "ICEBERG",
+            false));
+
+    // IAM tags
+    map.put(
+        TAGS,
+        stringOptionalPropertyEntry(
+            TAGS,
+            "IAM tags for the table in JSON array format: 
[{\"key\":\"value\"}]. "
+                + "Key should be namespaced key name, value should be short 
name.",
+            false,
+            null,
+            false));
+
+    return Collections.unmodifiableMap(map);
+  }
+
   @Override
   protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
-    return ImmutableMap.of();
+    return PROPERTIES_METADATA;
   }
 }
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryColumnDefaultValueConverter.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryColumnDefaultValueConverter.java
index 8e1da4f4a7..070bf2cd94 100644
--- 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryColumnDefaultValueConverter.java
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryColumnDefaultValueConverter.java
@@ -18,10 +18,14 @@
  */
 package org.apache.gravitino.catalog.bigquery.converter;
 
-import org.apache.commons.lang3.NotImplementedException;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
 import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
 import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Decimal;
 
 /** Column default value converter for BigQuery. */
 public class BigQueryColumnDefaultValueConverter extends 
JdbcColumnDefaultValueConverter {
@@ -32,6 +36,105 @@ public class BigQueryColumnDefaultValueConverter extends 
JdbcColumnDefaultValueC
       String columnDefaultValue,
       boolean isExpression,
       boolean nullable) {
-    throw new NotImplementedException("To be implemented in the future");
+    if (columnDefaultValue == null) {
+      return nullable ? Literals.NULL : DEFAULT_VALUE_NOT_SET;
+    }
+
+    if (columnDefaultValue.equalsIgnoreCase(NULL)) {
+      return Literals.NULL;
+    }
+
+    // Handle BigQuery expressions
+    if (isExpression) {
+      // BigQuery functions like CURRENT_TIMESTAMP(), CURRENT_DATE(), etc.
+      // are not supported as literals in Gravitino, return as unparsed 
expression
+      return UnparsedExpression.of(columnDefaultValue);
+    }
+
+    // Handle BigQuery specific default value formats
+    String trimmedValue = columnDefaultValue.trim();
+
+    // Handle quoted strings
+    if (trimmedValue.startsWith("'") && trimmedValue.endsWith("'") && 
trimmedValue.length() >= 2) {
+      String unquoted = trimmedValue.substring(1, trimmedValue.length() - 1);
+      return Literals.stringLiteral(unquoted);
+    }
+
+    // Handle boolean values (extract to method to avoid duplication)
+    Boolean boolValue = parseBooleanValue(trimmedValue);
+    if (boolValue != null) {
+      return Literals.booleanLiteral(boolValue);
+    }
+
+    // Handle numeric values based on BigQuery type
+    String typeName = type.getTypeName().toLowerCase();
+    switch (typeName) {
+      case BigQueryTypeConverter.INT64:
+        try {
+          return Literals.longLiteral(Long.parseLong(trimmedValue));
+        } catch (NumberFormatException e) {
+          return UnparsedExpression.of(columnDefaultValue);
+        }
+
+      case BigQueryTypeConverter.FLOAT64:
+        try {
+          return Literals.doubleLiteral(Double.parseDouble(trimmedValue));
+        } catch (NumberFormatException e) {
+          return UnparsedExpression.of(columnDefaultValue);
+        }
+
+      case BigQueryTypeConverter.NUMERIC:
+      case BigQueryTypeConverter.BIGNUMERIC:
+        try {
+          Integer precision = type.getColumnSize();
+          Integer scale = type.getScale();
+          // Gravitino Decimal supports precision up to 38, but BigQuery 
BIGNUMERIC supports up to
+          // 76.76
+          // For BIGNUMERIC with precision > 38, we cap it at 38 for Gravitino 
compatibility
+          if (precision != null && precision > 38) {
+            precision = 38;
+            // Adjust scale proportionally if needed
+            if (scale != null && scale > 38) {
+              scale = 38;
+            }
+          }
+          if (precision != null && scale != null) {
+            return Literals.decimalLiteral(Decimal.of(trimmedValue, precision, 
scale));
+          } else {
+            return Literals.decimalLiteral(Decimal.of(trimmedValue));
+          }
+        } catch (Exception e) {
+          return UnparsedExpression.of(columnDefaultValue);
+        }
+
+      case BigQueryTypeConverter.BOOL:
+        // Boolean already handled above, but keep for completeness
+        Boolean boolVal = parseBooleanValue(trimmedValue);
+        return boolVal != null
+            ? Literals.booleanLiteral(boolVal)
+            : UnparsedExpression.of(columnDefaultValue);
+
+      case BigQueryTypeConverter.STRING:
+        return Literals.stringLiteral(trimmedValue);
+
+      default:
+        // For other types, return as unparsed expression
+        return UnparsedExpression.of(columnDefaultValue);
+    }
+  }
+
+  /**
+   * Parses a boolean value from a string.
+   *
+   * @param value the string value to parse
+   * @return Boolean.TRUE, Boolean.FALSE, or null if not a boolean
+   */
+  private Boolean parseBooleanValue(String value) {
+    if (value.equalsIgnoreCase("true")) {
+      return Boolean.TRUE;
+    } else if (value.equalsIgnoreCase("false")) {
+      return Boolean.FALSE;
+    }
+    return null;
   }
 }
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryExceptionConverter.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryExceptionConverter.java
index f71b49672c..e70e6c2e45 100644
--- 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryExceptionConverter.java
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryExceptionConverter.java
@@ -18,17 +18,92 @@
  */
 package org.apache.gravitino.catalog.bigquery.converter;
 
+import java.net.HttpURLConnection;
 import java.sql.SQLException;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 
-/** Exception converter for BigQuery. */
+/**
+ * Exception converter for BigQuery.
+ *
+ * <p>Converts BigQuery JDBC SQLException to appropriate Gravitino exceptions 
based on error
+ * messages and error codes.
+ */
 public class BigQueryExceptionConverter extends JdbcExceptionConverter {
 
+  // HTTP status code for Too Many Requests (not defined in HttpURLConnection)
+  private static final int HTTP_TOO_MANY_REQUESTS = 429;
+
   @Override
   public GravitinoRuntimeException toGravitinoException(SQLException se) {
-    // For Phase 1, delegate to parent class implementation
-    // BigQuery-specific exception handling can be added in later phases
+    String errorMessage = se.getMessage();
+    if (errorMessage == null) {
+      return super.toGravitinoException(se);
+    }
+
+    String lowerMessage = errorMessage.toLowerCase();
+
+    // Table not found (404)
+    if (lowerMessage.contains("not found: table")
+        || lowerMessage.contains("table not found")
+        || 
(lowerMessage.contains(String.valueOf(HttpURLConnection.HTTP_NOT_FOUND))
+            && lowerMessage.contains("table"))) {
+      return new NoSuchTableException(se, "BigQuery table not found: %s", 
errorMessage);
+    }
+
+    // Dataset not found (404)
+    if (lowerMessage.contains("not found: dataset")
+        || lowerMessage.contains("dataset not found")
+        || 
(lowerMessage.contains(String.valueOf(HttpURLConnection.HTTP_NOT_FOUND))
+            && lowerMessage.contains("dataset"))) {
+      return new NoSuchSchemaException(se, "BigQuery dataset not found: %s", 
errorMessage);
+    }
+
+    // Table already exists (409)
+    if (lowerMessage.contains("already exists: table")
+        || lowerMessage.contains("table already exists")
+        || 
(lowerMessage.contains(String.valueOf(HttpURLConnection.HTTP_CONFLICT))
+            && lowerMessage.contains("table"))) {
+      return new TableAlreadyExistsException(se, "BigQuery table already 
exists: %s", errorMessage);
+    }
+
+    // Dataset already exists (409)
+    if (lowerMessage.contains("already exists: dataset")
+        || lowerMessage.contains("dataset already exists")
+        || 
(lowerMessage.contains(String.valueOf(HttpURLConnection.HTTP_CONFLICT))
+            && lowerMessage.contains("dataset"))) {
+      return new SchemaAlreadyExistsException(
+          se, "BigQuery dataset already exists: %s", errorMessage);
+    }
+
+    // Permission denied (403)
+    if (lowerMessage.contains("permission denied")
+        || lowerMessage.contains("access denied")
+        || 
lowerMessage.contains(String.valueOf(HttpURLConnection.HTTP_FORBIDDEN))) {
+      return new GravitinoRuntimeException(
+          se, "Permission denied: %s. Please check your BigQuery IAM 
permissions.", errorMessage);
+    }
+
+    // Invalid argument (400)
+    if (lowerMessage.contains("invalid")
+        || 
lowerMessage.contains(String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST))
+        || lowerMessage.contains("bad request")) {
+      return new GravitinoRuntimeException(se, "Invalid BigQuery operation: 
%s", errorMessage);
+    }
+
+    // Quota exceeded (429)
+    if (lowerMessage.contains("quota exceeded")
+        || lowerMessage.contains("rate limit")
+        || lowerMessage.contains(String.valueOf(HTTP_TOO_MANY_REQUESTS))) {
+      return new GravitinoRuntimeException(
+          se, "BigQuery quota exceeded: %s. Please try again later.", 
errorMessage);
+    }
+
+    // Default to parent class handling
     return super.toGravitinoException(se);
   }
 }
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryTypeConverter.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryTypeConverter.java
index b6b930c207..d92f075bb0 100644
--- 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryTypeConverter.java
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/converter/BigQueryTypeConverter.java
@@ -18,20 +18,189 @@
  */
 package org.apache.gravitino.catalog.bigquery.converter;
 
-import org.apache.commons.lang3.NotImplementedException;
+import java.util.Optional;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
 import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
 
 /** Type converter for BigQuery. */
 public class BigQueryTypeConverter extends JdbcTypeConverter {
 
+  // BigQuery type names
+  static final String BOOL = "bool";
+  static final String BOOLEAN = "boolean";
+  static final String INT64 = "int64";
+  static final String FLOAT64 = "float64";
+  static final String NUMERIC = "numeric";
+  static final String BIGNUMERIC = "bignumeric";
+  static final String STRING = "string";
+  static final String BYTES = "bytes";
+  static final String DATE = "date";
+  static final String DATETIME = "datetime";
+  static final String TIME = "time";
+  static final String TIMESTAMP = "timestamp";
+  static final String GEOGRAPHY = "geography";
+  static final String JSON = "json";
+
   @Override
   public Type toGravitino(JdbcTypeBean typeBean) {
-    throw new NotImplementedException("To be implemented in the future");
+    String typeName = typeBean.getTypeName().toLowerCase();
+
+    switch (typeName) {
+      case BOOL:
+      case BOOLEAN: // BigQuery may return "BOOLEAN" which becomes "boolean" 
in lowercase
+        return Types.BooleanType.get();
+      case INT64:
+        return Types.LongType.get();
+      case FLOAT64:
+        return Types.DoubleType.get();
+      case NUMERIC:
+        // BigQuery NUMERIC has precision 38, scale 9 by default
+        Integer precision = typeBean.getColumnSize();
+        Integer scale = typeBean.getScale();
+        if (precision != null && scale != null) {
+          return Types.DecimalType.of(precision, scale);
+        } else {
+          return Types.DecimalType.of(38, 9);
+        }
+      case BIGNUMERIC:
+        // BigQuery BIGNUMERIC has precision up to 76.76, scale up to 38 by 
default
+        // But Gravitino DecimalType is limited to precision 38, so we limit 
it to fit Gravitino's
+        // constraints
+        Integer bigNumericPrecision = typeBean.getColumnSize();
+        Integer bigNumericScale = typeBean.getScale();
+        if (bigNumericPrecision != null && bigNumericScale != null) {
+          // Limit precision to Gravitino's maximum (38 digits)
+          int limitedPrecision = Math.min(bigNumericPrecision, 38);
+          int limitedScale = Math.min(bigNumericScale, limitedPrecision);
+          return Types.DecimalType.of(limitedPrecision, limitedScale);
+        } else {
+          // Use Gravitino's maximum precision
+          return Types.DecimalType.of(38, 38);
+        }
+      case STRING:
+        return Types.StringType.get();
+      case BYTES:
+        return Types.BinaryType.get();
+      case DATE:
+        return Types.DateType.get();
+      case TIME:
+        return Optional.ofNullable(typeBean.getDatetimePrecision())
+            .map(Types.TimeType::of)
+            .orElseGet(Types.TimeType::get);
+      case DATETIME:
+        return Optional.ofNullable(typeBean.getDatetimePrecision())
+            .map(Types.TimestampType::withoutTimeZone)
+            .orElseGet(Types.TimestampType::withoutTimeZone);
+      case TIMESTAMP:
+        return Optional.ofNullable(typeBean.getDatetimePrecision())
+            .map(Types.TimestampType::withTimeZone)
+            .orElseGet(Types.TimestampType::withTimeZone);
+      case GEOGRAPHY:
+      case JSON:
+        // Handle GEOGRAPHY and JSON as external types with uppercase type name
+        return Types.ExternalType.of(typeBean.getTypeName().toUpperCase());
+      default:
+        // For complex types like ARRAY, STRUCT, RANGE, preserve the full type 
definition
+        // The typeName from JDBC should contain the complete type like 
"ARRAY<STRING>"
+        // We need to preserve this for proper SQL generation
+        return Types.ExternalType.of(typeBean.getTypeName().toUpperCase());
+    }
   }
 
   @Override
   public String fromGravitino(Type type) {
-    throw new NotImplementedException("To be implemented in the future");
+    if (type instanceof Types.BooleanType) {
+      return BOOL;
+    } else if (type instanceof Types.ByteType) {
+      // BigQuery doesn't have a direct byte type, map to INT64
+      return INT64;
+    } else if (type instanceof Types.ShortType) {
+      // BigQuery doesn't have a direct short type, map to INT64
+      return INT64;
+    } else if (type instanceof Types.IntegerType) {
+      // BigQuery doesn't have a direct int type, map to INT64
+      return INT64;
+    } else if (type instanceof Types.LongType) {
+      return INT64;
+    } else if (type instanceof Types.FloatType) {
+      // BigQuery doesn't have a direct float type, map to FLOAT64
+      return FLOAT64;
+    } else if (type instanceof Types.DoubleType) {
+      return FLOAT64;
+    } else if (type instanceof Types.StringType) {
+      return STRING;
+    } else if (type instanceof Types.VarCharType) {
+      // BigQuery STRING type is variable length
+      return STRING;
+    } else if (type instanceof Types.FixedCharType) {
+      // BigQuery doesn't have fixed char, use STRING
+      return STRING;
+    } else if (type instanceof Types.BinaryType) {
+      return BYTES;
+    } else if (type instanceof Types.DateType) {
+      return DATE;
+    } else if (type instanceof Types.TimeType timeType) {
+      return timeType.hasPrecisionSet()
+          ? String.format("%s(%d)", TIME, timeType.precision())
+          : TIME;
+    } else if (type instanceof Types.TimestampType timestampType) {
+      String baseType = timestampType.hasTimeZone() ? TIMESTAMP : DATETIME;
+      return timestampType.hasPrecisionSet()
+          ? String.format("%s(%d)", baseType, timestampType.precision())
+          : baseType;
+    } else if (type instanceof Types.DecimalType decimalType) {
+      // BigQuery NUMERIC: precision 1-38, scale 0-9 (or 0-precision)
+      // BigQuery BIGNUMERIC: precision 1-76, scale 0-38 (or 0-precision)
+      // For Gravitino DecimalType (max precision 38), always use NUMERIC
+      return String.format("NUMERIC(%d, %d)", decimalType.precision(), 
decimalType.scale());
+    } else if (type instanceof Types.ExternalType) {
+      return ((Types.ExternalType) type).catalogString();
+    } else if (type instanceof Types.UnparsedType unparsedType) {
+      // Handle unparsed types from Web UI
+      String unparsedStr = unparsedType.unparsedType().toLowerCase();
+
+      // Map common unparsed type names to BigQuery types
+      // This handles cases where Web UI sends lowercase type names or
+      // when users specify types through API calls
+      switch (unparsedStr) {
+        case "bool": // From Web UI type selection or API calls
+          return BOOL;
+        case "int64": // From Web UI type selection or API calls
+          return INT64;
+        case "float64": // From Web UI type selection or API calls
+          return FLOAT64;
+        case "string": // From Web UI type selection or API calls
+          return STRING;
+        case "bytes": // From Web UI type selection or API calls
+          return BYTES;
+        case "date": // From Web UI type selection or API calls
+          return DATE;
+        case "time": // From Web UI type selection or API calls
+          return TIME;
+        case "datetime": // From Web UI type selection or API calls
+          return DATETIME;
+        case "timestamp": // From Web UI type selection or API calls
+          return TIMESTAMP;
+        case "numeric": // From Web UI type selection or API calls
+          return NUMERIC;
+        case "bignumeric": // From Web UI type selection or API calls
+          return BIGNUMERIC;
+        default:
+          // For complete complex types like ARRAY<INT64>, STRUCT<...>, 
RANGE<DATE>, return as-is
+          // These come from API calls or direct type specifications, not from 
Web UI
+          if (unparsedStr.startsWith("array<")
+              || unparsedStr.startsWith("struct<")
+              || unparsedStr.startsWith("range<")) {
+            return unparsedStr;
+          }
+
+          // Return as-is for other unparsed types
+          return unparsedStr;
+      }
+    }
+
+    throw new IllegalArgumentException(
+        String.format("Couldn't convert Gravitino type %s to BigQuery type", 
type.simpleString()));
   }
 }
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java
index ab7b927087..0bc0f9193e 100644
--- 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java
@@ -18,41 +18,233 @@
  */
 package org.apache.gravitino.catalog.bigquery.operation;
 
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.TableList;
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.lang3.NotImplementedException;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.catalog.bigquery.BigQueryClientPool;
+import org.apache.gravitino.catalog.bigquery.BigQueryTablePropertiesMetadata;
+import org.apache.gravitino.catalog.bigquery.utils.BigQueryUtils;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
 import org.apache.gravitino.catalog.jdbc.JdbcTable;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
+import org.apache.gravitino.catalog.jdbc.utils.JdbcConnectorUtils;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.indexes.Index;
 
-/** Table operations for BigQuery. */
+/**
+ * Table operations for BigQuery.
+ *
+ * <p>Implements BigQuery-specific DDL operations for tables, including CREATE 
TABLE with
+ * partitioning and clustering, and ALTER TABLE with BigQuery's specific 
limitations.
+ *
+ * <p>This class uses BigQuery API for cross-region table listing to avoid 
JDBC connection
+ * limitations with region-specific datasets.
+ */
 public class BigQueryTableOperations extends JdbcTableOperations {
 
+  private static final String BACKTICK = "`";
+
   private BigQueryClientPool clientPool;
 
   /**
    * Sets the BigQuery API client pool.
    *
-   * @param clientPool BigQuery client pool
+   * @param clientPool BigQuery client pool for API operations
    */
   public void setClientPool(BigQueryClientPool clientPool) {
     this.clientPool = clientPool;
   }
 
+  @Override
+  protected Connection getConnection(String databaseName) throws SQLException {
+    // In BigQuery, databaseName is actually the dataset name (schema)
+    // We need to get a connection and set the schema, not the catalog
+    Connection connection = dataSource.getConnection();
+
+    LOG.debug("BigQuery connection established for dataset: {}", databaseName);
+
+    // Set schema to the dataset name
+    connection.setSchema(databaseName);
+
+    return connection;
+  }
+
+  /**
+   * Checks if a table exists using BigQuery API.
+   *
+   * <p>This method uses BigQuery API instead of JDBC for better performance 
and cross-region
+   * support.
+   *
+   * @param databaseName the dataset name
+   * @param tableName the table name
+   * @return true if the table exists, false otherwise
+   * @throws RuntimeException if API call fails for reasons other than table 
not found
+   */
+  @SuppressWarnings("unused")
+  protected boolean tableExistsViaApi(String databaseName, String tableName) {
+    if (clientPool == null) {
+      return false;
+    }
+
+    try {
+      Bigquery bigquery = clientPool.getClient();
+      String projectId = clientPool.getProjectId();
+
+      // Try to get the table
+      bigquery.tables().get(projectId, databaseName, tableName).execute();
+      return true;
+    } catch (IOException e) {
+      if (e.getMessage() != null && e.getMessage().contains("404")) {
+        return false; // Table not found
+      }
+      // For other errors (permission denied, network issues, etc.), throw 
exception
+      LOG.error(
+          "Failed to check table existence via API for {}.{}: {}",
+          databaseName,
+          tableName,
+          e.getMessage());
+      throw new RuntimeException(
+          String.format(
+              "Failed to check table existence for %s.%s: %s",
+              databaseName, tableName, e.getMessage()),
+          e);
+    }
+  }
+
   /**
-   * Gets the BigQuery API client pool.
+   * Lists all tables in the specified dataset using BigQuery API.
+   *
+   * <p>This method uses the BigQuery API instead of JDBC metadata to avoid 
regional limitations.
+   * JDBC connections are region-specific and cannot access datasets in 
different regions, which
+   * would cause "Database not found" errors.
    *
-   * @return BigQuery client pool, may be null if not set
+   * @param databaseName the dataset name
+   * @return list of table names
+   * @throws NoSuchSchemaException if the dataset does not exist
    */
-  protected BigQueryClientPool getClientPool() {
-    return clientPool;
+  @Override
+  public List<String> listTables(String databaseName) throws 
NoSuchSchemaException {
+    final List<String> names = new ArrayList<>();
+
+    // Use BigQuery API if client pool is available
+    if (clientPool != null) {
+      try {
+        Bigquery bigquery = clientPool.getClient();
+        String projectId = clientPool.getProjectId();
+
+        LOG.debug("Listing tables using BigQuery API for dataset: {}", 
databaseName);
+
+        // List all tables in the dataset using BigQuery API
+        Bigquery.Tables.List request = bigquery.tables().list(projectId, 
databaseName);
+
+        TableList response = request.execute();
+
+        if (response.getTables() != null) {
+          for (TableList.Tables table : response.getTables()) {
+            String tableId = table.getTableReference().getTableId();
+            names.add(tableId);
+            LOG.debug("Found table: {} in dataset: {}", tableId, databaseName);
+          }
+        }
+
+        LOG.info(
+            "Listed {} tables using API for dataset: {} in project: {}",
+            names.size(),
+            databaseName,
+            projectId);
+        return names;
+
+      } catch (IOException e) {
+        // Check if it's a "not found" error
+        if (e.getMessage() != null && e.getMessage().contains("404")) {
+          LOG.warn("Dataset not found: {}", databaseName);
+          throw new NoSuchSchemaException(
+              "Dataset '%s' does not exist in project '%s'",
+              databaseName, clientPool.getProjectId());
+        }
+        LOG.error("Failed to list tables using API for dataset: {}", 
databaseName, e);
+        throw new RuntimeException(
+            "Failed to list tables using BigQuery API for dataset: " + 
databaseName, e);
+      }
+    }
+
+    // Fallback to JDBC if API client is not available
+    LOG.warn(
+        "BigQuery API client not available, falling back to JDBC for dataset: 
{} "
+            + "(may fail for cross-region datasets)",
+        databaseName);
+
+    try (Connection connection = getConnection(databaseName);
+        ResultSet tables = getTables(connection)) {
+      while (tables.next()) {
+        String tableSchem = tables.getString("TABLE_SCHEM");
+        String tableName = tables.getString("TABLE_NAME");
+
+        // In BigQuery, TABLE_SCHEM contains the dataset name, not TABLE_CAT
+        if (Objects.equals(tableSchem, databaseName)) {
+          names.add(tableName);
+          LOG.debug("Found table: {} in dataset: {}", tableName, databaseName);
+        }
+      }
+      LOG.info("Finished listing tables size {} for dataset {} ", 
names.size(), databaseName);
+      return names;
+    } catch (final SQLException se) {
+      LOG.error("Failed to list tables for dataset: {}", databaseName, se);
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  public void create(
+      String databaseName,
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes)
+      throws TableAlreadyExistsException {
+
+    // In BigQuery, table name must be fully qualified with dataset
+    String qualifiedTableName = databaseName + "." + tableName;
+
+    try (Connection connection = getConnection(databaseName)) {
+
+      String createTableSql =
+          generateCreateTableSql(
+              qualifiedTableName,
+              columns,
+              comment,
+              properties,
+              partitioning,
+              distribution,
+              indexes);
+
+      JdbcConnectorUtils.executeUpdate(connection, createTableSql);
+
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
   }
 
   @Override
@@ -64,28 +256,461 @@ public class BigQueryTableOperations extends 
JdbcTableOperations {
       Transform[] partitioning,
       Distribution distribution,
       Index[] indexes) {
-    throw new NotImplementedException("To be implemented in the future");
+
+    // BigQuery does not support traditional indexes or constraints
+    if (ArrayUtils.isNotEmpty(indexes)) {
+      throw new UnsupportedOperationException(
+          "BigQuery does not support indexes or constraints. "
+              + "Use clustering instead for query optimization.");
+    }
+
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("CREATE TABLE ");
+
+    // In BigQuery, table name must be fully qualified with dataset
+    // Since we don't have databaseName in this method, we need to override 
the create method
+    // instead
+    // For now, assume tableName might already be qualified, or we'll fix this 
in the create method
+    if (!tableName.contains(".")) {
+      // This shouldn't happen in BigQuery, but let's handle it gracefully
+      LOG.warn("Table name {} is not fully qualified for BigQuery", tableName);
+    }
+
+    // Table name with backticks
+    sqlBuilder.append(BACKTICK).append(tableName).append(BACKTICK);
+    sqlBuilder.append(" (\n");
+
+    // Add column definitions
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append("  
").append(BACKTICK).append(column.name()).append(BACKTICK).append(" ");
+
+      // Data type
+      sqlBuilder.append(typeConverter.fromGravitino(column.dataType()));
+
+      // NOT NULL constraint
+      if (!column.nullable()) {
+        sqlBuilder.append(" NOT NULL");
+      }
+
+      // Column description via OPTIONS
+      if (StringUtils.isNotEmpty(column.comment())) {
+        sqlBuilder
+            .append(" OPTIONS(description=\"")
+            .append(escapeString(column.comment()))
+            .append("\")");
+      }
+
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
+    }
+
+    sqlBuilder.append("\n)");
+
+    // Add PARTITION BY clause
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      sqlBuilder.append("\n").append(generatePartitionClause(partitioning, 
columns));
+    }
+
+    // Add CLUSTER BY clause (from properties, not distribution)
+    if (properties != null
+        && 
properties.containsKey(BigQueryTablePropertiesMetadata.CLUSTERING_FIELDS)) {
+      String clusteringFields = 
properties.get(BigQueryTablePropertiesMetadata.CLUSTERING_FIELDS);
+      BigQueryUtils.validateClusteringFields(clusteringFields);
+      
sqlBuilder.append("\n").append(generateClusterByClause(clusteringFields));
+    }
+
+    // Validate distribution is NONE (clustering should be via properties)
+    if (!Distributions.NONE.equals(distribution)) {
+      throw new UnsupportedOperationException(
+          "BigQuery clustering should be specified via table property 
'clustering_fields', "
+              + "not through Distribution parameter.");
+    }
+
+    // Add OPTIONS clause
+    String optionsClause = generateTableOptionsClause(comment, properties, 
partitioning);
+    if (!optionsClause.isEmpty()) {
+      sqlBuilder.append("\n").append(optionsClause);
+    }
+
+    String result = sqlBuilder.append(";").toString();
+    LOG.info("Generated CREATE TABLE SQL for {}: {}", tableName, result);
+    return result;
+  }
+
+  @Override
+  protected void purgeTable(String databaseName, String tableName) {
+    LOG.info("Attempting to purge table {} from dataset {}", tableName, 
databaseName);
+
+    // In BigQuery, table name must be fully qualified with dataset
+    String qualifiedTableName = databaseName + "." + tableName;
+
+    try (Connection connection = getConnection(databaseName)) {
+      String dropTableSql = generatePurgeTableSql(qualifiedTableName);
+      JdbcConnectorUtils.executeUpdate(connection, dropTableSql);
+      LOG.info("Purged table {} from dataset {}", tableName, databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  protected void dropTable(String databaseName, String tableName) {
+    LOG.info("Attempting to drop table {} from dataset {}", tableName, 
databaseName);
+
+    // In BigQuery, table name must be fully qualified with dataset
+    String qualifiedTableName = databaseName + "." + tableName;
+
+    try (Connection connection = getConnection(databaseName)) {
+      String dropTableSql = generateDropTableSql(qualifiedTableName);
+      JdbcConnectorUtils.executeUpdate(connection, dropTableSql);
+      LOG.info("Dropped table {} from dataset {}", tableName, databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  protected String generatePurgeTableSql(String tableName) {
+    // BigQuery DROP TABLE immediately deletes the table
+    return "DROP TABLE " + BACKTICK + tableName + BACKTICK;
+  }
+
+  protected String generateDropTableSql(String tableName) {
+    // In BigQuery, DROP TABLE is the same as purge
+    return generatePurgeTableSql(tableName);
+  }
+
+  /**
+   * Generates PARTITION BY clause for CREATE TABLE.
+   *
+   * <p>BigQuery supports partitioning by: - DATE columns (direct partitioning 
for identity,
+   * DATE_TRUNC for month/year) - TIMESTAMP columns (using DATE() function for 
day partitioning or
+   * TIMESTAMP_TRUNC for hour) - DATETIME columns (using DATE() function for 
day partitioning or
+   * DATETIME_TRUNC) - INTEGER columns (using RANGE_BUCKET)
+   *
+   * @param partitioning the partitioning transforms
+   * @param columns the table columns to determine column types
+   * @return the PARTITION BY clause
+   */
+  private String generatePartitionClause(Transform[] partitioning, 
JdbcColumn[] columns) {
+    if (partitioning.length > 1) {
+      throw new UnsupportedOperationException(
+          "BigQuery supports only single-column partitioning, but got " + 
partitioning.length);
+    }
+
+    Transform transform = partitioning[0];
+    String partitionFieldName = null;
+
+    // Get the partition field name
+    if (transform instanceof Transforms.DayTransform) {
+      partitionFieldName = ((Transforms.DayTransform) 
transform).fieldName()[0];
+    } else if (transform instanceof Transforms.MonthTransform) {
+      partitionFieldName = ((Transforms.MonthTransform) 
transform).fieldName()[0];
+    } else if (transform instanceof Transforms.YearTransform) {
+      partitionFieldName = ((Transforms.YearTransform) 
transform).fieldName()[0];
+    } else if (transform instanceof Transforms.HourTransform) {
+      partitionFieldName = ((Transforms.HourTransform) 
transform).fieldName()[0];
+    } else if (transform instanceof Transforms.IdentityTransform) {
+      partitionFieldName = ((Transforms.IdentityTransform) 
transform).fieldName()[0];
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported partition transform: " + 
transform.getClass().getSimpleName());
+    }
+
+    if (partitionFieldName == null) {
+      throw new UnsupportedOperationException(
+          "Unsupported partition transform: " + 
transform.getClass().getSimpleName());
+    }
+
+    // Find the column type
+    JdbcColumn partitionColumn = null;
+    for (JdbcColumn column : columns) {
+      if (column.name().equals(partitionFieldName)) {
+        partitionColumn = column;
+        break;
+      }
+    }
+
+    if (partitionColumn == null) {
+      throw new IllegalArgumentException(
+          "Partition field '" + partitionFieldName + "' not found in table 
columns");
+    }
+
+    String columnType = 
typeConverter.fromGravitino(partitionColumn.dataType()).toLowerCase();
+    String fieldWithBackticks = BACKTICK + partitionFieldName + BACKTICK;
+
+    if (transform instanceof Transforms.DayTransform) {
+      // For day partitioning
+      if (columnType.equals("date")) {
+        // For DATE columns, use direct partitioning (not DATE() function)
+        return String.format("PARTITION BY %s", fieldWithBackticks);
+      } else if (columnType.equals("timestamp") || 
columnType.equals("datetime")) {
+        // For TIMESTAMP/DATETIME columns, use DATE() function
+        return String.format("PARTITION BY DATE(%s)", fieldWithBackticks);
+      } else {
+        throw new UnsupportedOperationException(
+            "Day partitioning is only supported for DATE, TIMESTAMP, and 
DATETIME columns, but got: "
+                + columnType);
+      }
+
+    } else if (transform instanceof Transforms.MonthTransform) {
+      switch (columnType) {
+        case "date":
+          return String.format("PARTITION BY DATE_TRUNC(%s, MONTH)", 
fieldWithBackticks);
+        case "timestamp":
+          return String.format("PARTITION BY TIMESTAMP_TRUNC(%s, MONTH)", 
fieldWithBackticks);
+        case "datetime":
+          return String.format("PARTITION BY DATETIME_TRUNC(%s, MONTH)", 
fieldWithBackticks);
+        default:
+          throw new UnsupportedOperationException(
+              "Month partitioning is only supported for DATE, TIMESTAMP, and 
DATETIME columns, but got: "
+                  + columnType);
+      }
+
+    } else if (transform instanceof Transforms.YearTransform) {
+      switch (columnType) {
+        case "date":
+          return String.format("PARTITION BY DATE_TRUNC(%s, YEAR)", 
fieldWithBackticks);
+        case "timestamp":
+          return String.format("PARTITION BY TIMESTAMP_TRUNC(%s, YEAR)", 
fieldWithBackticks);
+        case "datetime":
+          return String.format("PARTITION BY DATETIME_TRUNC(%s, YEAR)", 
fieldWithBackticks);
+        default:
+          throw new UnsupportedOperationException(
+              "Year partitioning is only supported for DATE, TIMESTAMP, and 
DATETIME columns, but got: "
+                  + columnType);
+      }
+
+    } else if (transform instanceof Transforms.HourTransform) {
+      // For hour partitioning
+      if (columnType.equals("timestamp")) {
+        return String.format("PARTITION BY TIMESTAMP_TRUNC(%s, HOUR)", 
fieldWithBackticks);
+      } else if (columnType.equals("datetime")) {
+        return String.format("PARTITION BY DATETIME_TRUNC(%s, HOUR)", 
fieldWithBackticks);
+      } else {
+        throw new UnsupportedOperationException(
+            "Hour partitioning is only supported for TIMESTAMP and DATETIME 
columns, but got: "
+                + columnType);
+      }
+
+    } else if (transform instanceof Transforms.IdentityTransform) {
+      switch (columnType) {
+        case "date":
+          // Direct partitioning for DATE columns
+          return String.format("PARTITION BY %s", fieldWithBackticks);
+        case "timestamp":
+        case "datetime":
+          // For TIMESTAMP/DATETIME columns, use DATE() function to extract 
date part
+          return String.format("PARTITION BY DATE(%s)", fieldWithBackticks);
+        case "int64":
+          // For INTEGER columns, would need RANGE_BUCKET (not implemented yet)
+          throw new UnsupportedOperationException(
+              "Identity partitioning for INTEGER columns (RANGE_BUCKET) is not 
yet implemented");
+        default:
+          throw new UnsupportedOperationException(
+              "Identity partitioning is only supported for DATE, TIMESTAMP, 
DATETIME, and INT64 columns, but got: "
+                  + columnType);
+      }
+
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported partition transform: " + 
transform.getClass().getSimpleName());
+    }
+  }
+
+  /**
+   * Generates CLUSTER BY clause.
+   *
+   * @param clusteringFields comma-separated field names
+   * @return the CLUSTER BY clause
+   */
+  private String generateClusterByClause(String clusteringFields) {
+    String[] fields = clusteringFields.split(",");
+    String fieldList =
+        Arrays.stream(fields)
+            .map(String::trim)
+            .map(field -> BACKTICK + field + BACKTICK)
+            .collect(Collectors.joining(", "));
+    return "CLUSTER BY " + fieldList;
+  }
+
+  private String generateTableOptionsClause(
+      String comment, Map<String, String> properties, Transform[] 
partitioning) {
+    List<String> options = new ArrayList<>();
+
+    // Description
+    if (StringUtils.isNotEmpty(comment)) {
+      options.add(String.format("description=\"%s\"", escapeString(comment)));
+    }
+
+    if (properties != null) {
+      // Partition expiration days - only valid for partitioned tables
+      if 
(properties.containsKey(BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS))
 {
+        if (ArrayUtils.isEmpty(partitioning)) {
+          LOG.warn(
+              "partition_expiration_days property is ignored because table is 
not partitioned. "
+                  + "This property is only supported for tables with PARTITION 
BY clause.");
+        } else {
+          options.add(
+              String.format(
+                  "partition_expiration_days=%s",
+                  
properties.get(BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS)));
+        }
+      }
+
+      // Require partition filter - only valid for partitioned tables
+      if 
(properties.containsKey(BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER))
 {
+        if (ArrayUtils.isEmpty(partitioning)) {
+          LOG.warn(
+              "require_partition_filter property is ignored because table is 
not partitioned. "
+                  + "This property is only supported for tables with PARTITION 
BY clause.");
+        } else {
+          options.add(
+              String.format(
+                  "require_partition_filter=%s",
+                  
properties.get(BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER)));
+        }
+      }
+
+      // Expiration timestamp
+      if 
(properties.containsKey(BigQueryTablePropertiesMetadata.EXPIRATION_TIMESTAMP)) {
+        options.add(
+            String.format(
+                "expiration_timestamp=TIMESTAMP \"%s\"",
+                
properties.get(BigQueryTablePropertiesMetadata.EXPIRATION_TIMESTAMP)));
+      }
+
+      // Friendly name
+      if 
(properties.containsKey(BigQueryTablePropertiesMetadata.FRIENDLY_NAME)) {
+        options.add(
+            String.format(
+                "friendly_name=\"%s\"",
+                
escapeString(properties.get(BigQueryTablePropertiesMetadata.FRIENDLY_NAME))));
+      }
+
+      // Labels - with proper formatting
+      if (properties.containsKey(BigQueryTablePropertiesMetadata.LABELS)) {
+        String labelsValue =
+            
formatLabelsValue(properties.get(BigQueryTablePropertiesMetadata.LABELS));
+        if (!"[]".equals(labelsValue)) {
+          options.add(String.format("labels=%s", labelsValue));
+        }
+      }
+
+      // KMS key name for encryption
+      if 
(properties.containsKey(BigQueryTablePropertiesMetadata.KMS_KEY_NAME)) {
+        options.add(
+            String.format(
+                "kms_key_name=\"%s\"",
+                
escapeString(properties.get(BigQueryTablePropertiesMetadata.KMS_KEY_NAME))));
+      }
+
+      // Default rounding mode
+      if 
(properties.containsKey(BigQueryTablePropertiesMetadata.DEFAULT_ROUNDING_MODE)) 
{
+        String roundingMode = 
properties.get(BigQueryTablePropertiesMetadata.DEFAULT_ROUNDING_MODE);
+        validateRoundingMode(roundingMode);
+        options.add(String.format("default_rounding_mode=\"%s\"", 
roundingMode));
+      }
+
+      // Enable change history (Preview)
+      if 
(properties.containsKey(BigQueryTablePropertiesMetadata.ENABLE_CHANGE_HISTORY)) 
{
+        options.add(
+            String.format(
+                "enable_change_history=%s",
+                
properties.get(BigQueryTablePropertiesMetadata.ENABLE_CHANGE_HISTORY)));
+      }
+
+      // Max staleness
+      if 
(properties.containsKey(BigQueryTablePropertiesMetadata.MAX_STALENESS)) {
+        options.add(
+            String.format(
+                "max_staleness=%s", 
properties.get(BigQueryTablePropertiesMetadata.MAX_STALENESS)));
+      }
+
+      // Enable fine-grained mutations (Preview)
+      if 
(properties.containsKey(BigQueryTablePropertiesMetadata.ENABLE_FINE_GRAINED_MUTATIONS))
 {
+        options.add(
+            String.format(
+                "enable_fine_grained_mutations=%s",
+                
properties.get(BigQueryTablePropertiesMetadata.ENABLE_FINE_GRAINED_MUTATIONS)));
+      }
+
+      // Managed table properties (Preview)
+      if (properties.containsKey(BigQueryTablePropertiesMetadata.STORAGE_URI)) 
{
+        options.add(
+            String.format(
+                "storage_uri=\"%s\"",
+                
escapeString(properties.get(BigQueryTablePropertiesMetadata.STORAGE_URI))));
+      }
+
+      if (properties.containsKey(BigQueryTablePropertiesMetadata.FILE_FORMAT)) 
{
+        String fileFormat = 
properties.get(BigQueryTablePropertiesMetadata.FILE_FORMAT);
+        validateFileFormat(fileFormat);
+        options.add(String.format("file_format=\"%s\"", fileFormat));
+      }
+
+      if 
(properties.containsKey(BigQueryTablePropertiesMetadata.TABLE_FORMAT)) {
+        String tableFormat = 
properties.get(BigQueryTablePropertiesMetadata.TABLE_FORMAT);
+        validateTableFormat(tableFormat);
+        options.add(String.format("table_format=\"%s\"", tableFormat));
+      }
+
+      // IAM tags
+      if (properties.containsKey(BigQueryTablePropertiesMetadata.TAGS)) {
+        String tagsValue = 
formatTagsValue(properties.get(BigQueryTablePropertiesMetadata.TAGS));
+        if (!"[]".equals(tagsValue)) {
+          options.add(String.format("tags=%s", tagsValue));
+        }
+      }
+    }
+
+    if (options.isEmpty()) {
+      return "";
+    }
+
+    return "OPTIONS(\n  " + String.join(",\n  ", options) + "\n)";
+  }
+
+  /**
+   * Escapes special characters in strings for SQL.
+   *
+   * @param str the string to escape
+   * @return the escaped string
+   */
+  private String escapeString(String str) {
+    if (str == null) {
+      return "";
+    }
+    return str.replace("\"", "\\\"").replace("\n", "\\n").replace("\r", 
"\\r").replace("\t", "\\t");
   }
 
   @Override
   protected boolean getAutoIncrementInfo(ResultSet resultSet) {
-    throw new NotImplementedException("To be implemented in the future");
+    // BigQuery does not support auto increment
+    return false;
   }
 
   @Override
   protected Map<String, String> getTableProperties(Connection connection, 
String tableName) {
-    throw new NotImplementedException("To be implemented in the future");
+    // BigQuery table properties are not easily accessible via JDBC
+    // Return empty map for now, properties can be set during creation
+    return Collections.emptyMap();
   }
 
   @Override
   protected List<Index> getIndexes(Connection connection, String databaseName, 
String tableName) {
-    throw new NotImplementedException("To be implemented in the future");
+    // BigQuery does not support traditional indexes
+    return Collections.emptyList();
   }
 
   @Override
   protected Transform[] getTablePartitioning(
       Connection connection, String databaseName, String tableName) {
-    throw new NotImplementedException("To be implemented in the future");
+    // BigQuery partitioning information is not easily accessible via JDBC
+    // Return empty array for now
+    return Transforms.EMPTY_TRANSFORM;
   }
 
   @Override
@@ -94,29 +719,421 @@ public class BigQueryTableOperations extends 
JdbcTableOperations {
       String databaseName,
       String tableName,
       JdbcTable.Builder tableBuilder) {
-    throw new NotImplementedException("To be implemented in the future");
+    // BigQuery-specific table field corrections can be added here if needed
+    // For now, use default JDBC behavior
   }
 
   @Override
-  protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
-    throw new NotImplementedException("To be implemented in the future");
+  public void rename(String databaseName, String oldTableName, String 
newTableName)
+      throws NoSuchTableException {
+    LOG.info(
+        "Attempting to rename table {}/{} to {}/{}",
+        databaseName,
+        oldTableName,
+        databaseName,
+        newTableName);
+
+    // In BigQuery, table names must be fully qualified with dataset
+    String qualifiedOldTableName = databaseName + "." + oldTableName;
+
+    try (Connection connection = this.getConnection(databaseName)) {
+      String sql =
+          String.format("ALTER TABLE `%s` RENAME TO `%s`", 
qualifiedOldTableName, newTableName);
+      JdbcConnectorUtils.executeUpdate(connection, sql);
+      LOG.info(
+          "Renamed table {}/{} to {}/{}", databaseName, oldTableName, 
databaseName, newTableName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
   }
 
   @Override
-  protected String generatePurgeTableSql(String tableName) {
-    throw new UnsupportedOperationException(
-        "BigQuery does not support purge table in Gravitino, please use drop 
table");
+  protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
+    // This method is not used in BigQuery since we override the rename method
+    // BigQuery supports ALTER TABLE RENAME TO since 2021
+    // Syntax: ALTER TABLE [IF EXISTS] table_name RENAME TO new_table_name
+    return String.format("ALTER TABLE `%s` RENAME TO `%s`", oldTableName, 
newTableName);
   }
 
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new NotImplementedException("To be implemented in the future");
+
+    // In BigQuery, table name must be fully qualified with dataset
+    String qualifiedTableName = databaseName + "." + tableName;
+
+    List<String> alterClauses = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        alterClauses.add(generateAddColumnClause(addColumn));
+      } else if (change instanceof TableChange.DeleteColumn) {
+        TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
change;
+        alterClauses.add(generateDropColumnClause(deleteColumn));
+      } else if (change instanceof TableChange.RenameColumn) {
+        TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) 
change;
+        alterClauses.add(generateRenameColumnClause(renameColumn));
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        TableChange.UpdateColumnType updateType = 
(TableChange.UpdateColumnType) change;
+        alterClauses.add(generateAlterColumnTypeClause(updateType));
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        TableChange.UpdateColumnNullability updateNullability =
+            (TableChange.UpdateColumnNullability) change;
+        
alterClauses.add(generateAlterColumnNullabilityClause(updateNullability));
+      } else if (change instanceof TableChange.SetProperty) {
+        TableChange.SetProperty setProperty = (TableChange.SetProperty) change;
+        alterClauses.add(generateSetOptionsClause(setProperty));
+      } else if (change instanceof TableChange.UpdateComment) {
+        TableChange.UpdateComment updateComment = (TableChange.UpdateComment) 
change;
+        alterClauses.add(generateUpdateCommentClause(updateComment));
+      } else {
+        throw new UnsupportedOperationException(
+            "Unsupported table change: " + change.getClass().getSimpleName());
+      }
+    }
+
+    if (alterClauses.isEmpty()) {
+      return "";
+    }
+
+    // BigQuery supports multiple ALTER operations in one statement
+    String result =
+        String.format(
+            "ALTER TABLE `%s`\n%s;", qualifiedTableName, String.join(",\n", 
alterClauses));
+
+    LOG.info("Generated ALTER TABLE SQL for {}: {}", qualifiedTableName, 
result);
+    return result;
+  }
+
+  private String generateAddColumnClause(TableChange.AddColumn addColumn) {
+    StringBuilder clause = new StringBuilder();
+    clause.append("ADD COLUMN ");
+
+    // BigQuery supports IF NOT EXISTS, but Gravitino's AddColumn doesn't have 
this flag
+    // We'll add it as a future enhancement if needed
+
+    String columnName = String.join(".", addColumn.getFieldName());
+    clause.append(BACKTICK).append(columnName).append(BACKTICK).append(" ");
+
+    // Data type
+    clause.append(typeConverter.fromGravitino(addColumn.getDataType()));
+
+    // NOT NULL constraint
+    if (!addColumn.isNullable()) {
+      clause.append(" NOT NULL");
+    }
+
+    // Column description
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      clause
+          .append(" OPTIONS(description=\"")
+          .append(escapeString(addColumn.getComment()))
+          .append("\")");
+    }
+
+    return clause.toString();
+  }
+
+  private String generateDropColumnClause(TableChange.DeleteColumn 
deleteColumn) {
+    StringBuilder clause = new StringBuilder();
+    clause.append("DROP COLUMN ");
+
+    if (deleteColumn.getIfExists() != null && deleteColumn.getIfExists()) {
+      clause.append("IF EXISTS ");
+    }
+
+    String columnName = String.join(".", deleteColumn.fieldName());
+    clause.append(BACKTICK).append(columnName).append(BACKTICK);
+
+    return clause.toString();
+  }
+
+  private String generateRenameColumnClause(TableChange.RenameColumn 
renameColumn) {
+    String oldName = String.join(".", renameColumn.fieldName());
+    String newName = renameColumn.getNewName();
+
+    return String.format("RENAME COLUMN `%s` TO `%s`", oldName, newName);
+  }
+
+  private String generateAlterColumnTypeClause(TableChange.UpdateColumnType 
updateType) {
+    String columnName = String.join(".", updateType.fieldName());
+    String newType = typeConverter.fromGravitino(updateType.getNewDataType());
+
+    return String.format("ALTER COLUMN `%s` SET DATA TYPE %s", columnName, 
newType);
+  }
+
+  private String generateAlterColumnNullabilityClause(
+      TableChange.UpdateColumnNullability updateNullability) {
+    String columnName = String.join(".", updateNullability.fieldName());
+
+    if (updateNullability.nullable()) {
+      return String.format("ALTER COLUMN `%s` DROP NOT NULL", columnName);
+    } else {
+      throw new UnsupportedOperationException(
+          "BigQuery does not support adding NOT NULL constraint to existing 
columns. "
+              + "Only DROP NOT NULL is supported.");
+    }
+  }
+
+  private String generateSetOptionsClause(TableChange.SetProperty setProperty) 
{
+    String property = setProperty.getProperty();
+    String value = setProperty.getValue();
+
+    // Map Gravitino properties to BigQuery table options
+    String optionName = mapPropertyToOption(property);
+    String optionValue = formatOptionValue(property, value);
+
+    return String.format("SET OPTIONS(%s=%s)", optionName, optionValue);
+  }
+
+  private String generateUpdateCommentClause(TableChange.UpdateComment 
updateComment) {
+    String newComment = updateComment.getNewComment();
+    if (newComment == null) {
+      newComment = "";
+    }
+    // Escape single quotes in comment
+    String escapedComment = newComment.replace("'", "\\'");
+    return String.format("SET OPTIONS(description='%s')", escapedComment);
+  }
+
+  // Property mapping from Gravitino to BigQuery options
+  private static final Map<String, String> PROPERTY_TO_OPTION_MAP =
+      Map.of(
+          "comment",
+          "description",
+          BigQueryTablePropertiesMetadata.EXPIRATION_TIMESTAMP,
+          "expiration_timestamp",
+          BigQueryTablePropertiesMetadata.FRIENDLY_NAME,
+          "friendly_name",
+          BigQueryTablePropertiesMetadata.LABELS,
+          "labels",
+          BigQueryTablePropertiesMetadata.KMS_KEY_NAME,
+          "kms_key_name",
+          BigQueryTablePropertiesMetadata.DEFAULT_ROUNDING_MODE,
+          "default_rounding_mode",
+          BigQueryTablePropertiesMetadata.ENABLE_CHANGE_HISTORY,
+          "enable_change_history",
+          BigQueryTablePropertiesMetadata.MAX_STALENESS,
+          "max_staleness",
+          BigQueryTablePropertiesMetadata.ENABLE_FINE_GRAINED_MUTATIONS,
+          "enable_fine_grained_mutations",
+          BigQueryTablePropertiesMetadata.STORAGE_URI,
+          "storage_uri");
+
+  // Additional property mappings (Map.of has a limit of 10 entries)
+  private static final Map<String, String> ADDITIONAL_PROPERTY_MAP =
+      Map.of(
+          BigQueryTablePropertiesMetadata.FILE_FORMAT, "file_format",
+          BigQueryTablePropertiesMetadata.TABLE_FORMAT, "table_format",
+          BigQueryTablePropertiesMetadata.TAGS, "tags");
+
+  private String mapPropertyToOption(String property) {
+    // Check main property map first
+    String option = PROPERTY_TO_OPTION_MAP.get(property);
+    if (option != null) {
+      return option;
+    }
+
+    // Check additional property map
+    option = ADDITIONAL_PROPERTY_MAP.get(property);
+    if (option != null) {
+      return option;
+    }
+
+    // Use as-is for BigQuery-specific properties
+    return property;
+  }
+
+  private String formatOptionValue(String property, String value) {
+    // Format value based on property type
+    switch (property) {
+      case "comment":
+      case BigQueryTablePropertiesMetadata.FRIENDLY_NAME:
+      case BigQueryTablePropertiesMetadata.KMS_KEY_NAME:
+      case BigQueryTablePropertiesMetadata.DEFAULT_ROUNDING_MODE:
+      case BigQueryTablePropertiesMetadata.STORAGE_URI:
+      case BigQueryTablePropertiesMetadata.FILE_FORMAT:
+      case BigQueryTablePropertiesMetadata.TABLE_FORMAT:
+        return "\"" + escapeString(value) + "\"";
+      case BigQueryTablePropertiesMetadata.EXPIRATION_TIMESTAMP:
+        return "TIMESTAMP \"" + value + "\"";
+      case BigQueryTablePropertiesMetadata.LABELS:
+        return formatLabelsValue(value);
+      case BigQueryTablePropertiesMetadata.TAGS:
+        return formatTagsValue(value);
+      case BigQueryTablePropertiesMetadata.ENABLE_CHANGE_HISTORY:
+      case BigQueryTablePropertiesMetadata.ENABLE_FINE_GRAINED_MUTATIONS:
+        // Boolean values
+        return value.toLowerCase();
+      case BigQueryTablePropertiesMetadata.MAX_STALENESS:
+        // Interval values don't need quotes
+        return value;
+      default:
+        // Try to determine if it's a string or numeric value
+        try {
+          Double.parseDouble(value);
+          return value; // Numeric value
+        } catch (NumberFormatException e) {
+          // Check if it's a boolean
+          if ("true".equalsIgnoreCase(value) || 
"false".equalsIgnoreCase(value)) {
+            return value.toLowerCase();
+          }
+          // String value
+          return "\"" + escapeString(value) + "\"";
+        }
+    }
+  }
+
+  private String formatLabelsValue(String labelsJson) {
+    // Convert JSON array format to BigQuery labels format
+    // Input: "[{\"key1\":\"value1\"},{\"key2\":\"value2\"}]"
+    // Output: [("key1", "value1"), ("key2", "value2")]
+
+    if (labelsJson == null || labelsJson.trim().isEmpty()) {
+      return "[]";
+    }
+
+    LOG.info("Processing labels JSON: {}", labelsJson);
+
+    try {
+      // Remove outer brackets and whitespace
+      String content = labelsJson.trim();
+      if (content.startsWith("[") && content.endsWith("]")) {
+        content = content.substring(1, content.length() - 1).trim();
+      }
+
+      if (content.isEmpty()) {
+        return "[]";
+      }
+
+      // Handle multiple objects separated by commas
+      // Split by },{ but be careful about commas inside strings
+      List<String> labelPairs = new ArrayList<>();
+
+      // Simple approach: split by },{
+      String[] rawPairs = content.split("},\\s*\\{");
+
+      for (String pair : rawPairs) {
+        // Clean up the pair
+        String cleanPair = pair.trim();
+        if (cleanPair.startsWith("{")) {
+          cleanPair = cleanPair.substring(1);
+        }
+        if (cleanPair.endsWith("}")) {
+          cleanPair = cleanPair.substring(0, cleanPair.length() - 1);
+        }
+
+        // Parse key:value pairs
+        // Look for pattern "key":"value"
+        String[] keyValue = cleanPair.split("\"\\s*:\\s*\"");
+        if (keyValue.length == 2) {
+          String key = keyValue[0].replace("\"", "").trim();
+          String value = keyValue[1].replace("\"", "").trim();
+
+          if (!key.isEmpty() && !value.isEmpty()) {
+            labelPairs.add(String.format("(\"%s\", \"%s\")", key, value));
+          }
+        }
+      }
+
+      if (labelPairs.isEmpty()) {
+        return "[]";
+      }
+
+      String result = "[" + String.join(", ", labelPairs) + "]";
+      LOG.info("Formatted labels: {}", result);
+      return result;
+
+    } catch (Exception e) {
+      LOG.warn("Failed to parse labels JSON: {}, returning empty array", 
labelsJson, e);
+      return "[]";
+    }
+  }
+
+  /**
+   * Formats tags value from JSON to BigQuery format.
+   *
+   * @param tagsJson JSON array of tag objects
+   * @return BigQuery tags format
+   */
+  private String formatTagsValue(String tagsJson) {
+    // Convert JSON array format to BigQuery tags format
+    // Input: "[{\"key1\":\"value1\"},{\"key2\":\"value2\"}]"
+    // Output: [("key1", "value1"), ("key2", "value2")]
+
+    // Tags use the same format as labels
+    return formatLabelsValue(tagsJson);
+  }
+
+  /**
+   * Validates rounding mode value.
+   *
+   * @param roundingMode the rounding mode to validate
+   * @throws IllegalArgumentException if invalid rounding mode
+   */
+  private void validateRoundingMode(String roundingMode) {
+    if (roundingMode == null || roundingMode.trim().isEmpty()) {
+      return;
+    }
+
+    String mode = roundingMode.trim().toUpperCase();
+    if (!"ROUND_HALF_AWAY_FROM_ZERO".equals(mode) && 
!"ROUND_HALF_EVEN".equals(mode)) {
+      throw new IllegalArgumentException(
+          "Invalid rounding mode: "
+              + roundingMode
+              + ". "
+              + "Supported values: ROUND_HALF_AWAY_FROM_ZERO, 
ROUND_HALF_EVEN");
+    }
+  }
+
+  /**
+   * Validates file format value.
+   *
+   * @param fileFormat the file format to validate
+   * @throws IllegalArgumentException if invalid file format
+   */
+  private void validateFileFormat(String fileFormat) {
+    if (fileFormat == null || fileFormat.trim().isEmpty()) {
+      return;
+    }
+
+    String format = fileFormat.trim().toUpperCase();
+    if (!"PARQUET".equals(format)) {
+      throw new IllegalArgumentException(
+          "Invalid file format: "
+              + fileFormat
+              + ". "
+              + "Only PARQUET is supported for managed tables.");
+    }
+  }
+
+  /**
+   * Validates table format value.
+   *
+   * @param tableFormat the table format to validate
+   * @throws IllegalArgumentException if invalid table format
+   */
+  private void validateTableFormat(String tableFormat) {
+    if (tableFormat == null || tableFormat.trim().isEmpty()) {
+      return;
+    }
+
+    String format = tableFormat.trim().toUpperCase();
+    if (!"ICEBERG".equals(format)) {
+      throw new IllegalArgumentException(
+          "Invalid table format: "
+              + tableFormat
+              + ". "
+              + "Only ICEBERG is supported for managed tables.");
+    }
   }
 
   @Override
   protected Distribution getDistributionInfo(
       Connection connection, String databaseName, String tableName) {
-    throw new NotImplementedException("To be implemented in the future");
+    // BigQuery uses clustering instead of traditional distribution
+    // Return NONE distribution as clustering is handled via table properties
+    return Distributions.NONE;
   }
 }
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/utils/BigQueryUtils.java
 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/utils/BigQueryUtils.java
new file mode 100644
index 0000000000..0d0e1c8e0b
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/utils/BigQueryUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery.utils;
+
+import java.util.Arrays;
+import org.apache.commons.lang3.StringUtils;
+
+/** Utility class for BigQuery catalog operations. */
+public class BigQueryUtils {
+
+  // Maximum number of clustering fields allowed by BigQuery
+  private static final int MAX_CLUSTERING_FIELDS = 4;
+
+  private BigQueryUtils() {
+    // Utility class, prevent instantiation
+  }
+
+  /**
+   * Validates clustering fields property value.
+   *
+   * <p>BigQuery supports a maximum of 4 clustering fields per table. This 
method validates that the
+   * clustering fields string contains at most 4 comma-separated field names.
+   *
+   * @param clusteringFields comma-separated clustering field names
+   * @throws IllegalArgumentException if more than 4 fields are specified or 
if all fields are empty
+   */
+  public static void validateClusteringFields(String clusteringFields) {
+    if (clusteringFields == null || clusteringFields.trim().isEmpty()) {
+      return; // Empty or null is valid (no clustering)
+    }
+
+    String[] fields = clusteringFields.split(",");
+    long fieldCount = 
Arrays.stream(fields).filter(StringUtils::isNotBlank).count();
+
+    if (fieldCount == 0) {
+      throw new IllegalArgumentException(
+          "Clustering fields cannot be empty. Either provide valid field names 
or remove the clustering_fields property.");
+    }
+
+    if (fieldCount > MAX_CLUSTERING_FIELDS) {
+      throw new IllegalArgumentException(
+          String.format(
+              "BigQuery supports maximum %d clustering fields, but got %d 
fields: %s. "
+                  + "Consider using partitioning or reducing the number of 
clustering fields.",
+              MAX_CLUSTERING_FIELDS, fieldCount, clusteringFields));
+    }
+  }
+
+  /**
+   * Gets the maximum number of clustering fields supported by BigQuery.
+   *
+   * @return maximum clustering fields (4)
+   */
+  public static int getMaxClusteringFields() {
+    return MAX_CLUSTERING_FIELDS;
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryTablePropertiesMetadata.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryTablePropertiesMetadata.java
new file mode 100644
index 0000000000..9dee94b0b0
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/TestBigQueryTablePropertiesMetadata.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import org.apache.gravitino.catalog.bigquery.utils.BigQueryUtils;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link BigQueryTablePropertiesMetadata}. */
+public class TestBigQueryTablePropertiesMetadata {
+
+  private BigQueryTablePropertiesMetadata metadata;
+
+  @BeforeEach
+  void setUp() {
+    metadata = new BigQueryTablePropertiesMetadata();
+  }
+
+  @Test
+  void testGetPropertyEntries() {
+    Map<String, PropertyEntry<?>> properties = metadata.propertyEntries();
+
+    // Check that all expected BigQuery-specific properties are present
+    
assertTrue(properties.containsKey(BigQueryTablePropertiesMetadata.DESCRIPTION));
+    
assertTrue(properties.containsKey(BigQueryTablePropertiesMetadata.FRIENDLY_NAME));
+    
assertTrue(properties.containsKey(BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS));
+    
assertTrue(properties.containsKey(BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER));
+    
assertTrue(properties.containsKey(BigQueryTablePropertiesMetadata.CLUSTERING_FIELDS));
+    
assertTrue(properties.containsKey(BigQueryTablePropertiesMetadata.EXPIRATION_TIMESTAMP));
+    assertTrue(properties.containsKey(BigQueryTablePropertiesMetadata.LABELS));
+
+    // Check that all properties are optional
+    
assertFalse(properties.get(BigQueryTablePropertiesMetadata.DESCRIPTION).isRequired());
+    
assertFalse(properties.get(BigQueryTablePropertiesMetadata.FRIENDLY_NAME).isRequired());
+    assertFalse(
+        
properties.get(BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS).isRequired());
+    assertFalse(
+        
properties.get(BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER).isRequired());
+    
assertFalse(properties.get(BigQueryTablePropertiesMetadata.CLUSTERING_FIELDS).isRequired());
+    
assertFalse(properties.get(BigQueryTablePropertiesMetadata.EXPIRATION_TIMESTAMP).isRequired());
+    
assertFalse(properties.get(BigQueryTablePropertiesMetadata.LABELS).isRequired());
+  }
+
+  @Test
+  void testPropertyDescriptions() {
+    Map<String, PropertyEntry<?>> properties = metadata.propertyEntries();
+
+    // Check that properties have meaningful descriptions
+    PropertyEntry<?> descriptionEntry = 
properties.get(BigQueryTablePropertiesMetadata.DESCRIPTION);
+    assertTrue(descriptionEntry.getDescription().contains("Table 
description"));
+
+    PropertyEntry<?> friendlyNameEntry =
+        properties.get(BigQueryTablePropertiesMetadata.FRIENDLY_NAME);
+    assertTrue(friendlyNameEntry.getDescription().contains("Friendly name"));
+
+    PropertyEntry<?> partitionExpirationEntry =
+        
properties.get(BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS);
+    assertTrue(partitionExpirationEntry.getDescription().contains("partitions 
expire"));
+
+    PropertyEntry<?> requirePartitionFilterEntry =
+        
properties.get(BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER);
+    
assertTrue(requirePartitionFilterEntry.getDescription().contains("partition 
filter"));
+
+    PropertyEntry<?> clusteringFieldsEntry =
+        properties.get(BigQueryTablePropertiesMetadata.CLUSTERING_FIELDS);
+    assertTrue(clusteringFieldsEntry.getDescription().contains("clustering 
field"));
+
+    PropertyEntry<?> expirationTimestampEntry =
+        properties.get(BigQueryTablePropertiesMetadata.EXPIRATION_TIMESTAMP);
+    assertTrue(expirationTimestampEntry.getDescription().contains("expires"));
+
+    PropertyEntry<?> labelsEntry = 
properties.get(BigQueryTablePropertiesMetadata.LABELS);
+    assertTrue(labelsEntry.getDescription().contains("labels"));
+  }
+
+  @Test
+  void testValidateClusteringFieldsValid() {
+    // Test valid cases
+    BigQueryUtils.validateClusteringFields(null);
+    BigQueryUtils.validateClusteringFields("");
+    BigQueryUtils.validateClusteringFields("   ");
+    BigQueryUtils.validateClusteringFields("field1");
+    BigQueryUtils.validateClusteringFields("field1,field2");
+    BigQueryUtils.validateClusteringFields("field1,field2,field3");
+    BigQueryUtils.validateClusteringFields("field1,field2,field3,field4");
+    BigQueryUtils.validateClusteringFields("field1, field2, field3, field4");
+    BigQueryUtils.validateClusteringFields("field1,  ,field2"); // Empty field 
should be ignored
+  }
+
+  @Test
+  void testValidateClusteringFieldsInvalid() {
+    // Test invalid case - more than 4 fields
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> 
BigQueryUtils.validateClusteringFields("field1,field2,field3,field4,field5"));
+
+    assertTrue(exception.getMessage().contains("maximum 4 clustering fields"));
+    assertTrue(exception.getMessage().contains("but got 5 fields"));
+  }
+
+  @Test
+  void testValidateClusteringFieldsWithSpaces() {
+    // Test with various spacing
+    BigQueryUtils.validateClusteringFields(" field1 , field2 , field3 , field4 
");
+
+    // Test with empty fields that should be ignored
+    BigQueryUtils.validateClusteringFields("field1,,field2,  ,field3");
+
+    // Test invalid case with spaces
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                BigQueryUtils.validateClusteringFields(
+                    " field1 , field2 , field3 , field4 , field5 "));
+
+    assertTrue(exception.getMessage().contains("maximum 4 clustering fields"));
+  }
+
+  @Test
+  void testValidateClusteringFieldsEmptyFields() {
+    // Test case where all fields are empty (fieldCount = 0)
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> BigQueryUtils.validateClusteringFields("  ,  ,  "));
+
+    assertTrue(exception.getMessage().contains("Clustering fields cannot be 
empty"));
+
+    // Test another case with only commas
+    exception =
+        assertThrows(
+            IllegalArgumentException.class, () -> 
BigQueryUtils.validateClusteringFields(",,,"));
+
+    assertTrue(exception.getMessage().contains("Clustering fields cannot be 
empty"));
+  }
+
+  @Test
+  void testGetMaxClusteringFields() {
+    assertEquals(4, BigQueryUtils.getMaxClusteringFields());
+  }
+
+  @Test
+  void testPropertyConstants() {
+    // Test that all property constants are defined correctly
+    assertEquals(
+        "partition_expiration_days", 
BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS);
+    assertEquals(
+        "require_partition_filter", 
BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER);
+    assertEquals("clustering_fields", 
BigQueryTablePropertiesMetadata.CLUSTERING_FIELDS);
+    assertEquals("expiration_timestamp", 
BigQueryTablePropertiesMetadata.EXPIRATION_TIMESTAMP);
+    assertEquals("friendly_name", 
BigQueryTablePropertiesMetadata.FRIENDLY_NAME);
+    assertEquals("description", BigQueryTablePropertiesMetadata.DESCRIPTION);
+    assertEquals("labels", BigQueryTablePropertiesMetadata.LABELS);
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/converter/TestBigQueryColumnDefaultValueConverter.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/converter/TestBigQueryColumnDefaultValueConverter.java
new file mode 100644
index 0000000000..cf7761e5aa
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/converter/TestBigQueryColumnDefaultValueConverter.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery.converter;
+
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link BigQueryColumnDefaultValueConverter}. */
+public class TestBigQueryColumnDefaultValueConverter {
+
+  private BigQueryColumnDefaultValueConverter converter;
+
+  @BeforeEach
+  void setUp() {
+    converter = new BigQueryColumnDefaultValueConverter();
+  }
+
+  @Test
+  void testNullValues() {
+    JdbcTypeConverter.JdbcTypeBean typeBean = createTypeBean("string", null, 
null, null);
+
+    // Null value with nullable column
+    Expression result = converter.toGravitino(typeBean, null, false, true);
+    assertEquals(Literals.NULL, result);
+
+    // Null value with non-nullable column
+    result = converter.toGravitino(typeBean, null, false, false);
+    assertEquals(DEFAULT_VALUE_NOT_SET, result);
+
+    // Explicit NULL string
+    result = converter.toGravitino(typeBean, "NULL", false, true);
+    assertEquals(Literals.NULL, result);
+  }
+
+  @Test
+  void testStringValues() {
+    JdbcTypeConverter.JdbcTypeBean typeBean = createTypeBean("string", null, 
null, null);
+
+    // Quoted string
+    Expression result = converter.toGravitino(typeBean, "'hello world'", 
false, true);
+    assertEquals(Literals.stringLiteral("hello world"), result);
+
+    // Unquoted string (treated as string literal for STRING type)
+    result = converter.toGravitino(typeBean, "hello", false, true);
+    assertEquals(Literals.stringLiteral("hello"), result);
+  }
+
+  @Test
+  void testBooleanValues() {
+    JdbcTypeConverter.JdbcTypeBean typeBean = createTypeBean("bool", null, 
null, null);
+
+    // Boolean true
+    Expression result = converter.toGravitino(typeBean, "true", false, true);
+    assertEquals(Literals.booleanLiteral(true), result);
+
+    // Boolean false
+    result = converter.toGravitino(typeBean, "false", false, true);
+    assertEquals(Literals.booleanLiteral(false), result);
+
+    // Case insensitive
+    result = converter.toGravitino(typeBean, "TRUE", false, true);
+    assertEquals(Literals.booleanLiteral(true), result);
+
+    result = converter.toGravitino(typeBean, "FALSE", false, true);
+    assertEquals(Literals.booleanLiteral(false), result);
+  }
+
+  @Test
+  void testNumericValues() {
+    // INT64 type
+    JdbcTypeConverter.JdbcTypeBean int64TypeBean = createTypeBean("int64", 
null, null, null);
+    Expression result = converter.toGravitino(int64TypeBean, "123", false, 
true);
+    assertEquals(Literals.longLiteral(123L), result);
+
+    // FLOAT64 type
+    JdbcTypeConverter.JdbcTypeBean float64TypeBean = createTypeBean("float64", 
null, null, null);
+    result = converter.toGravitino(float64TypeBean, "123.45", false, true);
+    assertEquals(Literals.doubleLiteral(123.45), result);
+
+    // Invalid numeric value should return unparsed expression
+    result = converter.toGravitino(int64TypeBean, "not_a_number", false, true);
+    assertTrue(result instanceof UnparsedExpression);
+    assertEquals("not_a_number", ((UnparsedExpression) 
result).unparsedExpression());
+  }
+
+  @Test
+  void testDecimalValues() {
+    // NUMERIC type with precision and scale
+    JdbcTypeConverter.JdbcTypeBean numericTypeBean = createTypeBean("numeric", 
10, 2, null);
+    Expression result = converter.toGravitino(numericTypeBean, "123.45", 
false, true);
+    assertTrue(result instanceof Literals.LiteralImpl);
+
+    // BIGNUMERIC type
+    JdbcTypeConverter.JdbcTypeBean bigNumericTypeBean = 
createTypeBean("bignumeric", 20, 5, null);
+    result = converter.toGravitino(bigNumericTypeBean, "12345.67890", false, 
true);
+    assertTrue(result instanceof Literals.LiteralImpl);
+
+    // Invalid decimal should return unparsed expression
+    result = converter.toGravitino(numericTypeBean, "invalid_decimal", false, 
true);
+    assertTrue(result instanceof UnparsedExpression);
+  }
+
+  @Test
+  void testExpressionValues() {
+    JdbcTypeConverter.JdbcTypeBean typeBean = createTypeBean("timestamp", 
null, null, null);
+
+    // Expression values should return unparsed expression
+    Expression result = converter.toGravitino(typeBean, "CURRENT_TIMESTAMP()", 
true, true);
+    assertTrue(result instanceof UnparsedExpression);
+    assertEquals("CURRENT_TIMESTAMP()", ((UnparsedExpression) 
result).unparsedExpression());
+  }
+
+  @Test
+  void testUnknownTypes() {
+    JdbcTypeConverter.JdbcTypeBean typeBean = createTypeBean("unknown_type", 
null, null, null);
+
+    // Unknown types should return unparsed expression
+    Expression result = converter.toGravitino(typeBean, "some_value", false, 
true);
+    assertTrue(result instanceof UnparsedExpression);
+    assertEquals("some_value", ((UnparsedExpression) 
result).unparsedExpression());
+  }
+
+  @Test
+  void testQuotedStrings() {
+    JdbcTypeConverter.JdbcTypeBean typeBean = createTypeBean("string", null, 
null, null);
+
+    // Test various quoted string formats
+    Expression result = converter.toGravitino(typeBean, "'simple string'", 
false, true);
+    assertEquals(Literals.stringLiteral("simple string"), result);
+
+    // Empty quoted string
+    result = converter.toGravitino(typeBean, "''", false, true);
+    assertEquals(Literals.stringLiteral(""), result);
+
+    // String with spaces
+    result = converter.toGravitino(typeBean, "'  spaced  '", false, true);
+    assertEquals(Literals.stringLiteral("  spaced  "), result);
+  }
+
+  /**
+   * Helper method to create a JdbcTypeBean for testing.
+   *
+   * @param typeName the type name
+   * @param columnSize the column size (precision)
+   * @param scale the scale
+   * @param datetimePrecision the datetime precision
+   * @return a JdbcTypeBean instance
+   */
+  private JdbcTypeConverter.JdbcTypeBean createTypeBean(
+      String typeName, Integer columnSize, Integer scale, Integer 
datetimePrecision) {
+    return new JdbcTypeConverter.JdbcTypeBean(typeName) {
+      @Override
+      public Integer getColumnSize() {
+        return columnSize;
+      }
+
+      @Override
+      public Integer getScale() {
+        return scale;
+      }
+
+      @Override
+      public Integer getDatetimePrecision() {
+        return datetimePrecision;
+      }
+    };
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/converter/TestBigQueryTypeConverter.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/converter/TestBigQueryTypeConverter.java
new file mode 100644
index 0000000000..f155938d99
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/converter/TestBigQueryTypeConverter.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery.converter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link BigQueryTypeConverter}. */
+public class TestBigQueryTypeConverter {
+
+  private BigQueryTypeConverter typeConverter;
+
+  @BeforeEach
+  void setUp() {
+    typeConverter = new BigQueryTypeConverter();
+  }
+
+  @Test
+  void testFromGravitinoBasicTypes() {
+    // Boolean
+    assertEquals("bool", typeConverter.fromGravitino(Types.BooleanType.get()));
+
+    // Integer types - all map to INT64
+    assertEquals("int64", typeConverter.fromGravitino(Types.ByteType.get()));
+    assertEquals("int64", typeConverter.fromGravitino(Types.ShortType.get()));
+    assertEquals("int64", 
typeConverter.fromGravitino(Types.IntegerType.get()));
+    assertEquals("int64", typeConverter.fromGravitino(Types.LongType.get()));
+
+    // Float types - all map to FLOAT64
+    assertEquals("float64", 
typeConverter.fromGravitino(Types.FloatType.get()));
+    assertEquals("float64", 
typeConverter.fromGravitino(Types.DoubleType.get()));
+
+    // String types
+    assertEquals("string", 
typeConverter.fromGravitino(Types.StringType.get()));
+    assertEquals("string", 
typeConverter.fromGravitino(Types.VarCharType.of(100)));
+    assertEquals("string", 
typeConverter.fromGravitino(Types.FixedCharType.of(10)));
+
+    // Binary
+    assertEquals("bytes", typeConverter.fromGravitino(Types.BinaryType.get()));
+
+    // Date
+    assertEquals("date", typeConverter.fromGravitino(Types.DateType.get()));
+  }
+
+  @Test
+  void testFromGravitinoTimeTypes() {
+    // Time without precision
+    assertEquals("time", typeConverter.fromGravitino(Types.TimeType.get()));
+
+    // Time with precision
+    assertEquals("time(6)", typeConverter.fromGravitino(Types.TimeType.of(6)));
+
+    // Timestamp without timezone (DATETIME in BigQuery)
+    assertEquals("datetime", 
typeConverter.fromGravitino(Types.TimestampType.withoutTimeZone()));
+
+    // Timestamp with timezone (TIMESTAMP in BigQuery)
+    assertEquals("timestamp", 
typeConverter.fromGravitino(Types.TimestampType.withTimeZone()));
+
+    // Timestamp with precision
+    assertEquals(
+        "datetime(3)", 
typeConverter.fromGravitino(Types.TimestampType.withoutTimeZone(3)));
+    assertEquals("timestamp(6)", 
typeConverter.fromGravitino(Types.TimestampType.withTimeZone(6)));
+  }
+
+  @Test
+  void testFromGravitinoDecimalTypes() {
+    // Standard precision - use NUMERIC (BigQuery NUMERIC supports precision 
1-38, scale 0-9 or
+    // 0-precision)
+    assertEquals("NUMERIC(10, 2)", 
typeConverter.fromGravitino(Types.DecimalType.of(10, 2)));
+    assertEquals("NUMERIC(38, 9)", 
typeConverter.fromGravitino(Types.DecimalType.of(38, 9)));
+    assertEquals("NUMERIC(38, 2)", 
typeConverter.fromGravitino(Types.DecimalType.of(38, 2)));
+
+    // Note: Gravitino DecimalType is limited to precision 38
+    // So we always use NUMERIC, never BIGNUMERIC
+  }
+
+  @Test
+  void testFromGravitinoUnparsedType() {
+    // Test unparsed types from Web UI - only the exact types that are in the 
Web UI list
+    assertEquals("bool", 
typeConverter.fromGravitino(Types.UnparsedType.of("BOOL")));
+    assertEquals("int64", 
typeConverter.fromGravitino(Types.UnparsedType.of("INT64")));
+    assertEquals("float64", 
typeConverter.fromGravitino(Types.UnparsedType.of("FLOAT64")));
+    assertEquals("string", 
typeConverter.fromGravitino(Types.UnparsedType.of("STRING")));
+    assertEquals("bytes", 
typeConverter.fromGravitino(Types.UnparsedType.of("BYTES")));
+    assertEquals("date", 
typeConverter.fromGravitino(Types.UnparsedType.of("DATE")));
+    assertEquals("time", 
typeConverter.fromGravitino(Types.UnparsedType.of("TIME")));
+    assertEquals("datetime", 
typeConverter.fromGravitino(Types.UnparsedType.of("DATETIME")));
+    assertEquals("timestamp", 
typeConverter.fromGravitino(Types.UnparsedType.of("TIMESTAMP")));
+    assertEquals("numeric", 
typeConverter.fromGravitino(Types.UnparsedType.of("NUMERIC")));
+    assertEquals("bignumeric", 
typeConverter.fromGravitino(Types.UnparsedType.of("BIGNUMERIC")));
+
+    // Test complete complex types - these should be returned as-is (from API 
calls)
+    assertEquals(
+        "array<string>", 
typeConverter.fromGravitino(Types.UnparsedType.of("array<string>")));
+    assertEquals(
+        "array<int64>", 
typeConverter.fromGravitino(Types.UnparsedType.of("array<int64>")));
+    assertEquals(
+        "struct<name string, age int64>",
+        typeConverter.fromGravitino(Types.UnparsedType.of("struct<name string, 
age int64>")));
+    assertEquals("range<date>", 
typeConverter.fromGravitino(Types.UnparsedType.of("range<date>")));
+  }
+
+  @Test
+  void testFromGravitinoExternalType() {
+    assertEquals(
+        "array<string>", 
typeConverter.fromGravitino(Types.ExternalType.of("array<string>")));
+    assertEquals("geography", 
typeConverter.fromGravitino(Types.ExternalType.of("geography")));
+    assertEquals("json", 
typeConverter.fromGravitino(Types.ExternalType.of("json")));
+  }
+
+  @Test
+  void testFromGravitinoUnsupportedType() {
+    // Test with a type that doesn't have a mapping
+    assertThrows(
+        IllegalArgumentException.class, () -> 
typeConverter.fromGravitino(Types.NullType.get()));
+  }
+
+  @Test
+  void testToGravitinoBasicTypes() {
+    // Boolean
+    assertEquals(
+        Types.BooleanType.get(),
+        typeConverter.toGravitino(createTypeBean("bool", null, null, null)));
+
+    // Integer
+    assertEquals(
+        Types.LongType.get(), 
typeConverter.toGravitino(createTypeBean("int64", null, null, null)));
+
+    // Float
+    assertEquals(
+        Types.DoubleType.get(),
+        typeConverter.toGravitino(createTypeBean("float64", null, null, 
null)));
+
+    // String
+    assertEquals(
+        Types.StringType.get(),
+        typeConverter.toGravitino(createTypeBean("string", null, null, null)));
+
+    // Binary
+    assertEquals(
+        Types.BinaryType.get(),
+        typeConverter.toGravitino(createTypeBean("bytes", null, null, null)));
+
+    // Date
+    assertEquals(
+        Types.DateType.get(), typeConverter.toGravitino(createTypeBean("date", 
null, null, null)));
+  }
+
+  @Test
+  void testToGravitinoTimeTypes() {
+    // Time without precision
+    assertEquals(
+        Types.TimeType.get(), typeConverter.toGravitino(createTypeBean("time", 
null, null, null)));
+
+    // Time with precision
+    assertEquals(
+        Types.TimeType.of(6), typeConverter.toGravitino(createTypeBean("time", 
null, null, 6)));
+
+    // DateTime (timestamp without timezone)
+    assertEquals(
+        Types.TimestampType.withoutTimeZone(),
+        typeConverter.toGravitino(createTypeBean("datetime", null, null, 
null)));
+
+    // DateTime with precision
+    assertEquals(
+        Types.TimestampType.withoutTimeZone(3),
+        typeConverter.toGravitino(createTypeBean("datetime", null, null, 3)));
+
+    // Timestamp (with timezone)
+    assertEquals(
+        Types.TimestampType.withTimeZone(),
+        typeConverter.toGravitino(createTypeBean("timestamp", null, null, 
null)));
+
+    // Timestamp with precision
+    assertEquals(
+        Types.TimestampType.withTimeZone(6),
+        typeConverter.toGravitino(createTypeBean("timestamp", null, null, 6)));
+  }
+
+  @Test
+  void testToGravitinoDecimalTypes() {
+    // NUMERIC with precision and scale
+    assertEquals(
+        Types.DecimalType.of(10, 2),
+        typeConverter.toGravitino(createTypeBean("numeric", 10, 2, null)));
+
+    // NUMERIC with default precision and scale
+    assertEquals(
+        Types.DecimalType.of(38, 9),
+        typeConverter.toGravitino(createTypeBean("numeric", null, null, 
null)));
+
+    // BIGNUMERIC with precision and scale (limited to Gravitino's max 
precision)
+    assertEquals(
+        Types.DecimalType.of(38, 10),
+        typeConverter.toGravitino(createTypeBean("bignumeric", 38, 10, null)));
+
+    // BIGNUMERIC with default precision and scale (limited to Gravitino's max 
precision)
+    assertEquals(
+        Types.DecimalType.of(38, 38),
+        typeConverter.toGravitino(createTypeBean("bignumeric", null, null, 
null)));
+  }
+
+  @Test
+  void testToGravitinoExternalTypes() {
+    // Geography
+    assertEquals(
+        Types.ExternalType.of("GEOGRAPHY"),
+        typeConverter.toGravitino(createTypeBean("geography", null, null, 
null)));
+
+    // JSON
+    assertEquals(
+        Types.ExternalType.of("JSON"),
+        typeConverter.toGravitino(createTypeBean("json", null, null, null)));
+
+    // Array type
+    assertEquals(
+        Types.ExternalType.of("ARRAY<STRING>"),
+        typeConverter.toGravitino(createTypeBean("array<string>", null, null, 
null)));
+
+    // Struct type
+    assertEquals(
+        Types.ExternalType.of("STRUCT<NAME STRING, AGE INT64>"),
+        typeConverter.toGravitino(
+            createTypeBean("struct<name string, age int64>", null, null, 
null)));
+  }
+
+  @Test
+  void testToGravitinoUnknownType() {
+    // Unknown type should be handled as external type
+    assertEquals(
+        Types.ExternalType.of("UNKNOWN_TYPE"),
+        typeConverter.toGravitino(createTypeBean("unknown_type", null, null, 
null)));
+  }
+
+  /**
+   * Helper method to create a JdbcTypeBean for testing.
+   *
+   * @param typeName the type name
+   * @param columnSize the column size (precision)
+   * @param scale the scale
+   * @param datetimePrecision the datetime precision
+   * @return a JdbcTypeBean instance
+   */
+  private JdbcTypeConverter.JdbcTypeBean createTypeBean(
+      String typeName, Integer columnSize, Integer scale, Integer 
datetimePrecision) {
+    return new JdbcTypeConverter.JdbcTypeBean(typeName) {
+      @Override
+      public Integer getColumnSize() {
+        return columnSize;
+      }
+
+      @Override
+      public Integer getScale() {
+        return scale;
+      }
+
+      @Override
+      public Integer getDatetimePrecision() {
+        return datetimePrecision;
+      }
+    };
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQuerySqlGeneration.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQuerySqlGeneration.java
new file mode 100644
index 0000000000..2bdc7b3f10
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQuerySqlGeneration.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery.operation;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.catalog.bigquery.BigQueryTablePropertiesMetadata;
+import org.apache.gravitino.catalog.bigquery.converter.BigQueryTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests for BigQuery SQL generation functionality. */
+public class TestBigQuerySqlGeneration {
+
+  @BeforeEach
+  void setUp() {
+    // Test setup is done per test method
+  }
+
+  private BigQueryTableOperations createTableOperations() {
+    BigQueryTableOperations tableOperations = new BigQueryTableOperations();
+    BigQueryTypeConverter typeConverter = new BigQueryTypeConverter();
+    // Use reflection to set the protected field for testing
+    try {
+      Field field = 
tableOperations.getClass().getSuperclass().getDeclaredField("typeConverter");
+      field.setAccessible(true);
+      field.set(tableOperations, typeConverter);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set typeConverter for testing", e);
+    }
+    return tableOperations;
+  }
+
+  @Test
+  void testCreateTableWithAllFeatures() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+
+    String tableName = "my_project.my_dataset.my_table";
+    JdbcColumn[] columns = {
+      JdbcColumn.builder()
+          .withName("id")
+          .withType(Types.LongType.get())
+          .withNullable(false)
+          .withComment("Primary key")
+          .build(),
+      JdbcColumn.builder()
+          .withName("name")
+          .withType(Types.StringType.get())
+          .withNullable(true)
+          .withComment("User name")
+          .build(),
+      JdbcColumn.builder()
+          .withName("created_date")
+          .withType(Types.DateType.get())
+          .withNullable(false)
+          .build(),
+      JdbcColumn.builder()
+          .withName("category")
+          .withType(Types.StringType.get())
+          .withNullable(true)
+          .build()
+    };
+
+    String comment = "Test table with all BigQuery features";
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BigQueryTablePropertiesMetadata.CLUSTERING_FIELDS, 
"category,name");
+    properties.put(BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS, 
"30");
+    properties.put(BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER, 
"true");
+    properties.put(BigQueryTablePropertiesMetadata.FRIENDLY_NAME, "My Test 
Table");
+
+    Transform[] partitioning = {Transforms.day("created_date")};
+    Index[] indexes = new Index[0];
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName, columns, comment, properties, partitioning, 
Distributions.NONE, indexes);
+
+    // Verify the generated SQL contains expected elements
+    assertTrue(sql.contains("CREATE TABLE `my_project.my_dataset.my_table`"));
+    assertTrue(sql.contains("`id` int64 NOT NULL OPTIONS(description=\"Primary 
key\")"));
+    assertTrue(sql.contains("`name` string OPTIONS(description=\"User 
name\")"));
+    assertTrue(sql.contains("`created_date` date NOT NULL"));
+    assertTrue(sql.contains("`category` string"));
+    assertTrue(sql.contains("PARTITION BY `created_date`"));
+    assertTrue(sql.contains("CLUSTER BY `category`, `name`"));
+    assertTrue(sql.contains("description=\"Test table with all BigQuery 
features\""));
+    assertTrue(sql.contains("partition_expiration_days=30"));
+    assertTrue(sql.contains("require_partition_filter=true"));
+    assertTrue(sql.contains("friendly_name=\"My Test Table\""));
+    assertTrue(sql.endsWith(";"));
+  }
+
+  @Test
+  void testCreateTableWithDifferentPartitioning() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+
+    String tableName = "test.table";
+    JdbcColumn[] columns = {
+      JdbcColumn.builder()
+          .withName("timestamp_col")
+          .withType(Types.TimestampType.withTimeZone())
+          .withNullable(false)
+          .build()
+    };
+
+    // Test hour partitioning
+    Transform[] hourPartitioning = {Transforms.hour("timestamp_col")};
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName, columns, null, null, hourPartitioning, 
Distributions.NONE, new Index[0]);
+    assertTrue(sql.contains("PARTITION BY TIMESTAMP_TRUNC(`timestamp_col`, 
HOUR)"));
+
+    // Test month partitioning
+    Transform[] monthPartitioning = {Transforms.month("timestamp_col")};
+    sql =
+        tableOperations.generateCreateTableSql(
+            tableName, columns, null, null, monthPartitioning, 
Distributions.NONE, new Index[0]);
+    assertTrue(sql.contains("PARTITION BY TIMESTAMP_TRUNC(`timestamp_col`, 
MONTH)"));
+
+    // Test year partitioning
+    Transform[] yearPartitioning = {Transforms.year("timestamp_col")};
+    sql =
+        tableOperations.generateCreateTableSql(
+            tableName, columns, null, null, yearPartitioning, 
Distributions.NONE, new Index[0]);
+    assertTrue(sql.contains("PARTITION BY TIMESTAMP_TRUNC(`timestamp_col`, 
YEAR)"));
+  }
+
+  @Test
+  void testCreateTableWithComplexTypes() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+
+    String tableName = "test.complex_table";
+    JdbcColumn[] columns = {
+      JdbcColumn.builder()
+          .withName("decimal_col")
+          .withType(Types.DecimalType.of(10, 2))
+          .withNullable(false)
+          .build(),
+      JdbcColumn.builder()
+          .withName("timestamp_col")
+          .withType(Types.TimestampType.withTimeZone(6))
+          .withNullable(true)
+          .build(),
+      JdbcColumn.builder()
+          .withName("time_col")
+          .withType(Types.TimeType.of(3))
+          .withNullable(true)
+          .build()
+    };
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName, columns, null, null, new Transform[0], 
Distributions.NONE, new Index[0]);
+
+    assertTrue(sql.contains("`decimal_col` NUMERIC(10, 2) NOT NULL"));
+    assertTrue(sql.contains("`timestamp_col` timestamp(6)"));
+    assertTrue(sql.contains("`time_col` time(3)"));
+  }
+
+  @Test
+  void testDropTableSql() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String sql = 
tableOperations.generatePurgeTableSql("my_project.my_dataset.my_table");
+    assertEquals("DROP TABLE `my_project.my_dataset.my_table`", sql);
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQueryTableOperations.java
 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQueryTableOperations.java
new file mode 100644
index 0000000000..d2d4f3e351
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/operation/TestBigQueryTableOperations.java
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.bigquery.operation;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.catalog.bigquery.BigQueryTablePropertiesMetadata;
+import org.apache.gravitino.catalog.bigquery.converter.BigQueryTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link BigQueryTableOperations}. */
+public class TestBigQueryTableOperations {
+
+  @BeforeEach
+  void setUp() {
+    // Test setup is done per test method
+  }
+
+  private BigQueryTableOperations createTableOperations() {
+    BigQueryTableOperations tableOperations = new BigQueryTableOperations();
+    BigQueryTypeConverter typeConverter = new BigQueryTypeConverter();
+    // Use reflection to set the protected field for testing
+    try {
+      java.lang.reflect.Field field =
+          
tableOperations.getClass().getSuperclass().getDeclaredField("typeConverter");
+      field.setAccessible(true);
+      field.set(tableOperations, typeConverter);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set typeConverter for testing", e);
+    }
+    return tableOperations;
+  }
+
+  @Test
+  void testGenerateCreateTableSqlBasic() {
+    BigQueryTableOperations tableOperations = new BigQueryTableOperations();
+    BigQueryTypeConverter typeConverter = new BigQueryTypeConverter();
+    // Use reflection to set the protected field for testing
+    try {
+      java.lang.reflect.Field field =
+          
tableOperations.getClass().getSuperclass().getDeclaredField("typeConverter");
+      field.setAccessible(true);
+      field.set(tableOperations, typeConverter);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set typeConverter for testing", e);
+    }
+
+    String tableName = "test_dataset.test_table";
+    JdbcColumn[] columns = {
+      JdbcColumn.builder()
+          .withName("id")
+          .withType(Types.LongType.get())
+          .withNullable(false)
+          .withComment("Primary key")
+          .build(),
+      JdbcColumn.builder()
+          .withName("name")
+          .withType(Types.StringType.get())
+          .withNullable(true)
+          .withComment("User name")
+          .build(),
+      JdbcColumn.builder()
+          .withName("created_at")
+          .withType(Types.TimestampType.withTimeZone())
+          .withNullable(false)
+          .build()
+    };
+
+    String comment = "Test table";
+    Map<String, String> properties = new HashMap<>();
+    Transform[] partitioning = new Transform[0];
+    Index[] indexes = new Index[0];
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName, columns, comment, properties, partitioning, 
Distributions.NONE, indexes);
+
+    assertTrue(sql.contains("CREATE TABLE `test_dataset.test_table`"));
+    assertTrue(sql.contains("`id` int64 NOT NULL OPTIONS(description=\"Primary 
key\")"));
+    assertTrue(sql.contains("`name` string"));
+    assertTrue(sql.contains("`created_at` timestamp NOT NULL"));
+    assertTrue(sql.contains("OPTIONS(\n  description=\"Test table\"\n)"));
+    assertTrue(sql.endsWith(";"));
+  }
+
+  @Test
+  void testGenerateCreateTableSqlWithPartitioning() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String tableName = "test_dataset.test_table";
+    JdbcColumn[] columns = {
+      JdbcColumn.builder()
+          .withName("id")
+          .withType(Types.LongType.get())
+          .withNullable(false)
+          .build(),
+      JdbcColumn.builder()
+          .withName("created_date")
+          .withType(Types.DateType.get())
+          .withNullable(false)
+          .build()
+    };
+
+    Transform[] partitioning = {Transforms.day("created_date")};
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName, columns, null, null, partitioning, Distributions.NONE, 
new Index[0]);
+
+    assertTrue(sql.contains("PARTITION BY `created_date`"));
+  }
+
+  @Test
+  void testGenerateCreateTableSqlWithClustering() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String tableName = "test_dataset.test_table";
+    JdbcColumn[] columns = {
+      JdbcColumn.builder()
+          .withName("id")
+          .withType(Types.LongType.get())
+          .withNullable(false)
+          .build(),
+      JdbcColumn.builder()
+          .withName("category")
+          .withType(Types.StringType.get())
+          .withNullable(true)
+          .build()
+    };
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BigQueryTablePropertiesMetadata.CLUSTERING_FIELDS, 
"category,id");
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName,
+            columns,
+            null,
+            properties,
+            new Transform[0],
+            Distributions.NONE,
+            new Index[0]);
+
+    assertTrue(sql.contains("CLUSTER BY `category`, `id`"));
+  }
+
+  @Test
+  void testGenerateCreateTableSqlWithPartitionProperties() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String tableName = "test_dataset.test_table";
+    JdbcColumn[] columns = {
+      JdbcColumn.builder()
+          .withName("id")
+          .withType(Types.LongType.get())
+          .withNullable(false)
+          .build(),
+      JdbcColumn.builder()
+          .withName("created_date")
+          .withType(Types.DateType.get())
+          .withNullable(false)
+          .build()
+    };
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS, 
"30");
+    properties.put(BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER, 
"true");
+
+    Transform[] partitioning = {Transforms.day("created_date")};
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName,
+            columns,
+            "Test table",
+            properties,
+            partitioning,
+            Distributions.NONE,
+            new Index[0]);
+
+    assertTrue(sql.contains("PARTITION BY `created_date`"));
+    assertTrue(sql.contains("partition_expiration_days=30"));
+    assertTrue(sql.contains("require_partition_filter=true"));
+  }
+
+  @Test
+  void testGenerateCreateTableSqlWithPartitionPropertiesButNoPartition() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String tableName = "test_dataset.test_table";
+    JdbcColumn[] columns = {
+      
JdbcColumn.builder().withName("id").withType(Types.LongType.get()).withNullable(false).build()
+    };
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS, 
"30");
+    properties.put(BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER, 
"true");
+
+    // No partitioning - these properties should be ignored
+    Transform[] partitioning = new Transform[0];
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName,
+            columns,
+            "Test table",
+            properties,
+            partitioning,
+            Distributions.NONE,
+            new Index[0]);
+
+    // Partition-related properties should NOT be in the SQL
+    assertFalse(sql.contains("partition_expiration_days"));
+    assertFalse(sql.contains("require_partition_filter"));
+    // But description should still be there
+    assertTrue(sql.contains("description=\"Test table\""));
+  }
+
+  @Test
+  void testGenerateCreateTableSqlWithIndexes() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String tableName = "test_dataset.test_table";
+    JdbcColumn[] columns = {
+      
JdbcColumn.builder().withName("id").withType(Types.LongType.get()).withNullable(false).build()
+    };
+
+    Index[] indexes = {Indexes.of(Index.IndexType.PRIMARY_KEY, "pk_id", new 
String[][] {{"id"}})};
+
+    assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            tableOperations.generateCreateTableSql(
+                tableName, columns, null, null, new Transform[0], 
Distributions.NONE, indexes));
+  }
+
+  @Test
+  void testGenerateCreateTableSqlWithUnsupportedDistribution() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String tableName = "test_dataset.test_table";
+    JdbcColumn[] columns = {
+      
JdbcColumn.builder().withName("id").withType(Types.LongType.get()).withNullable(false).build()
+    };
+
+    assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            tableOperations.generateCreateTableSql(
+                tableName,
+                columns,
+                null,
+                null,
+                new Transform[0],
+                Distributions.hash(1, Transforms.identity("id")),
+                new Index[0]));
+  }
+
+  @Test
+  void testGeneratePartitionClause() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    // Test day partitioning with DATE column
+    JdbcColumn[] dateColumns = {
+      JdbcColumn.builder()
+          .withName("created_date")
+          .withType(Types.DateType.get())
+          .withNullable(false)
+          .build()
+    };
+    Transform[] dayPartitioning = {Transforms.day("created_date")};
+    String sql =
+        tableOperations.generateCreateTableSql(
+            "test.table",
+            dateColumns,
+            null,
+            null,
+            dayPartitioning,
+            Distributions.NONE,
+            new Index[0]);
+    assertTrue(sql.contains("PARTITION BY `created_date`"));
+
+    // Test day partitioning with TIMESTAMP column
+    JdbcColumn[] timestampColumns = {
+      JdbcColumn.builder()
+          .withName("created_timestamp")
+          .withType(Types.TimestampType.withTimeZone())
+          .withNullable(false)
+          .build()
+    };
+    Transform[] dayPartitioningTimestamp = 
{Transforms.day("created_timestamp")};
+    sql =
+        tableOperations.generateCreateTableSql(
+            "test.table",
+            timestampColumns,
+            null,
+            null,
+            dayPartitioningTimestamp,
+            Distributions.NONE,
+            new Index[0]);
+    assertTrue(sql.contains("PARTITION BY DATE(`created_timestamp`)"));
+
+    // Test month partitioning with DATE column
+    Transform[] monthPartitioning = {Transforms.month("created_date")};
+    sql =
+        tableOperations.generateCreateTableSql(
+            "test.table",
+            dateColumns,
+            null,
+            null,
+            monthPartitioning,
+            Distributions.NONE,
+            new Index[0]);
+    assertTrue(sql.contains("PARTITION BY DATE_TRUNC(`created_date`, MONTH)"));
+
+    // Test year partitioning with DATE column
+    Transform[] yearPartitioning = {Transforms.year("created_date")};
+    sql =
+        tableOperations.generateCreateTableSql(
+            "test.table",
+            dateColumns,
+            null,
+            null,
+            yearPartitioning,
+            Distributions.NONE,
+            new Index[0]);
+    assertTrue(sql.contains("PARTITION BY DATE_TRUNC(`created_date`, YEAR)"));
+
+    // Test hour partitioning with TIMESTAMP column
+    Transform[] hourPartitioning = {Transforms.hour("created_timestamp")};
+    sql =
+        tableOperations.generateCreateTableSql(
+            "test.table",
+            timestampColumns,
+            null,
+            null,
+            hourPartitioning,
+            Distributions.NONE,
+            new Index[0]);
+    assertTrue(sql.contains("PARTITION BY TIMESTAMP_TRUNC(`created_timestamp`, 
HOUR)"));
+
+    // Test identity partitioning with DATE column
+    Transform[] identityPartitioning = {Transforms.identity("created_date")};
+    sql =
+        tableOperations.generateCreateTableSql(
+            "test.table",
+            dateColumns,
+            null,
+            null,
+            identityPartitioning,
+            Distributions.NONE,
+            new Index[0]);
+    assertTrue(sql.contains("PARTITION BY `created_date`"));
+
+    // Test identity partitioning with TIMESTAMP column
+    Transform[] identityPartitioningTimestamp = 
{Transforms.identity("created_timestamp")};
+    sql =
+        tableOperations.generateCreateTableSql(
+            "test.table",
+            timestampColumns,
+            null,
+            null,
+            identityPartitioningTimestamp,
+            Distributions.NONE,
+            new Index[0]);
+    assertTrue(sql.contains("PARTITION BY DATE(`created_timestamp`)"));
+  }
+
+  @Test
+  void testGenerateCreateTableSqlWithAllProperties() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String tableName = "test_dataset.test_table";
+    JdbcColumn[] columns = {
+      JdbcColumn.builder()
+          .withName("id")
+          .withType(Types.LongType.get())
+          .withNullable(false)
+          .build(),
+      JdbcColumn.builder()
+          .withName("created_date")
+          .withType(Types.DateType.get())
+          .withNullable(false)
+          .build()
+    };
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS, 
"30.5");
+    properties.put(BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER, 
"true");
+    properties.put(BigQueryTablePropertiesMetadata.CLUSTERING_FIELDS, "id");
+    properties.put(BigQueryTablePropertiesMetadata.EXPIRATION_TIMESTAMP, 
"2025-12-31T23:59:59Z");
+    properties.put(BigQueryTablePropertiesMetadata.FRIENDLY_NAME, "Test 
Table");
+    properties.put(
+        BigQueryTablePropertiesMetadata.LABELS, 
"[{\"env\":\"test\"},{\"team\":\"data\"}]");
+    properties.put(
+        BigQueryTablePropertiesMetadata.KMS_KEY_NAME,
+        "projects/test/locations/us/keyRings/ring/cryptoKeys/key");
+    properties.put(BigQueryTablePropertiesMetadata.DEFAULT_ROUNDING_MODE, 
"ROUND_HALF_EVEN");
+    properties.put(BigQueryTablePropertiesMetadata.ENABLE_CHANGE_HISTORY, 
"true");
+    properties.put(
+        BigQueryTablePropertiesMetadata.MAX_STALENESS, "INTERVAL \"1:0:0\" 
HOUR TO SECOND");
+
+    Transform[] partitioning = {Transforms.day("created_date")};
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName,
+            columns,
+            "Comprehensive test table",
+            properties,
+            partitioning,
+            Distributions.NONE,
+            new Index[0]);
+
+    // Verify basic structure
+    assertTrue(sql.contains("CREATE TABLE `test_dataset.test_table`"));
+    assertTrue(sql.contains("PARTITION BY `created_date`"));
+    assertTrue(sql.contains("CLUSTER BY `id`"));
+
+    // Verify all properties are included
+    assertTrue(sql.contains("description=\"Comprehensive test table\""));
+    assertTrue(sql.contains("partition_expiration_days=30.5"));
+    assertTrue(sql.contains("require_partition_filter=true"));
+    assertTrue(sql.contains("expiration_timestamp=TIMESTAMP 
\"2025-12-31T23:59:59Z\""));
+    assertTrue(sql.contains("friendly_name=\"Test Table\""));
+    assertTrue(sql.contains("labels="));
+    assertTrue(sql.contains("kms_key_name="));
+    assertTrue(sql.contains("default_rounding_mode=\"ROUND_HALF_EVEN\""));
+    assertTrue(sql.contains("enable_change_history=true"));
+    assertTrue(sql.contains("max_staleness=INTERVAL \"1:0:0\" HOUR TO 
SECOND"));
+  }
+
+  @Test
+  void testGeneratePurgeTableSql() {
+    BigQueryTableOperations tableOperations = new BigQueryTableOperations();
+    String sql = 
tableOperations.generatePurgeTableSql("test_dataset.test_table");
+    assertEquals("DROP TABLE `test_dataset.test_table`", sql);
+  }
+
+  @Test
+  void testGenerateRenameTableSql() {
+    BigQueryTableOperations tableOperations = new BigQueryTableOperations();
+    String sql = tableOperations.generateRenameTableSql("dataset.old_table", 
"new_table");
+    assertEquals("ALTER TABLE `dataset.old_table` RENAME TO `new_table`", sql);
+  }
+
+  @Test
+  void testGetAutoIncrementInfo() throws SQLException {
+    BigQueryTableOperations tableOperations = new BigQueryTableOperations();
+    ResultSet mockResultSet = mock(ResultSet.class);
+    when(mockResultSet.getString("IS_AUTOINCREMENT")).thenReturn("YES");
+
+    // BigQuery doesn't support auto increment, should always return false
+    assertFalse(tableOperations.getAutoIncrementInfo(mockResultSet));
+  }
+
+  @Test
+  void testEscapeString() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    // Test the private escapeString method through SQL generation
+    JdbcColumn[] columns = {
+      JdbcColumn.builder()
+          .withName("test")
+          .withType(Types.StringType.get())
+          .withNullable(true)
+          .withComment("Test \"quoted\" comment\nwith newline")
+          .build()
+    };
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            "test.table",
+            columns,
+            "Table with \"quotes\" and\nnewlines",
+            null,
+            new Transform[0],
+            Distributions.NONE,
+            new Index[0]);
+
+    assertTrue(sql.contains("\\\"quoted\\\""));
+    assertTrue(sql.contains("\\n"));
+  }
+
+  @Test
+  void testManagedTableProperties() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String tableName = "test_dataset.managed_table";
+    JdbcColumn[] columns = {
+      
JdbcColumn.builder().withName("id").withType(Types.LongType.get()).withNullable(false).build()
+    };
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BigQueryTablePropertiesMetadata.STORAGE_URI, 
"gs://my-bucket/my-table/");
+    properties.put(BigQueryTablePropertiesMetadata.FILE_FORMAT, "PARQUET");
+    properties.put(BigQueryTablePropertiesMetadata.TABLE_FORMAT, "ICEBERG");
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName,
+            columns,
+            "Managed table test",
+            properties,
+            new Transform[0],
+            Distributions.NONE,
+            new Index[0]);
+
+    assertTrue(sql.contains("storage_uri=\"gs://my-bucket/my-table/\""));
+    assertTrue(sql.contains("file_format=\"PARQUET\""));
+    assertTrue(sql.contains("table_format=\"ICEBERG\""));
+  }
+
+  @Test
+  void testPreviewFeatureProperties() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String tableName = "test_dataset.preview_table";
+    JdbcColumn[] columns = {
+      
JdbcColumn.builder().withName("id").withType(Types.LongType.get()).withNullable(false).build()
+    };
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BigQueryTablePropertiesMetadata.ENABLE_CHANGE_HISTORY, 
"true");
+    
properties.put(BigQueryTablePropertiesMetadata.ENABLE_FINE_GRAINED_MUTATIONS, 
"false");
+    properties.put(
+        BigQueryTablePropertiesMetadata.MAX_STALENESS, "INTERVAL \"2:30:0\" 
HOUR TO SECOND");
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName,
+            columns,
+            "Preview features test",
+            properties,
+            new Transform[0],
+            Distributions.NONE,
+            new Index[0]);
+
+    assertTrue(sql.contains("enable_change_history=true"));
+    assertTrue(sql.contains("enable_fine_grained_mutations=false"));
+    assertTrue(sql.contains("max_staleness=INTERVAL \"2:30:0\" HOUR TO 
SECOND"));
+  }
+
+  @Test
+  void testEncryptionAndSecurityProperties() {
+    BigQueryTableOperations tableOperations = createTableOperations();
+    String tableName = "test_dataset.secure_table";
+    JdbcColumn[] columns = {
+      
JdbcColumn.builder().withName("id").withType(Types.LongType.get()).withNullable(false).build()
+    };
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(
+        BigQueryTablePropertiesMetadata.KMS_KEY_NAME,
+        "projects/test/locations/us/keyRings/ring/cryptoKeys/key");
+    properties.put(BigQueryTablePropertiesMetadata.DEFAULT_ROUNDING_MODE, 
"ROUND_HALF_EVEN");
+    properties.put(
+        BigQueryTablePropertiesMetadata.TAGS,
+        "[{\"security\":\"high\"},{\"compliance\":\"gdpr\"}]");
+
+    String sql =
+        tableOperations.generateCreateTableSql(
+            tableName,
+            columns,
+            "Secure table test",
+            properties,
+            new Transform[0],
+            Distributions.NONE,
+            new Index[0]);
+
+    assertTrue(
+        
sql.contains("kms_key_name=\"projects/test/locations/us/keyRings/ring/cryptoKeys/key\""));
+    assertTrue(sql.contains("default_rounding_mode=\"ROUND_HALF_EVEN\""));
+    assertTrue(sql.contains("tags="));
+  }
+}


Reply via email to