This is an automated email from the ASF dual-hosted git repository. yuqi4733 pushed a commit to branch internal-main in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 48f927bd63538dd784da5da2a86c42b9d2db8ae3 Author: geyanggang <[email protected]> AuthorDate: Fri Feb 6 17:00:12 2026 +0800 [#121] feat (bigquery-catalog): Fix BigQuery table properties loading via BigQuery API --- .../operation/BigQueryTableOperations.java | 224 ++++++++++++++++++++- docs/jdbc-bigquery-catalog.md | 4 - 2 files changed, 215 insertions(+), 13 deletions(-) diff --git a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java index 3eac0eb0cf..009ec5421c 100644 --- a/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java +++ b/catalogs/catalog-jdbc-bigquery/src/main/java/org/apache/gravitino/catalog/bigquery/operation/BigQueryTableOperations.java @@ -1,7 +1,9 @@ package org.apache.gravitino.catalog.bigquery.operation; import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableList; +import com.google.api.services.bigquery.model.TimePartitioning; import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; @@ -9,10 +11,13 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.catalog.bigquery.BigQueryClientPool; @@ -676,8 +681,20 @@ public class BigQueryTableOperations extends JdbcTableOperations { @Override protected Map<String, String> getTableProperties(Connection connection, String tableName) { - // BigQuery table properties are not easily accessible via JDBC - // Return empty map for now, properties can be set during creation + // Try to get properties via BigQuery API if available + if (clientPool != null) { + try { + String databaseName = connection.getSchema(); + return getTablePropertiesViaApi(databaseName, tableName); + } catch (Exception e) { + LOG.warn( + "Failed to get table properties via BigQuery API for table: {}, falling back to empty map", + tableName, + e); + } + } + + // Fallback: BigQuery table properties are not accessible via JDBC return Collections.emptyMap(); } @@ -690,8 +707,19 @@ public class BigQueryTableOperations extends JdbcTableOperations { @Override protected Transform[] getTablePartitioning( Connection connection, String databaseName, String tableName) { - // BigQuery partitioning information is not easily accessible via JDBC - // Return empty array for now + // Try to get partitioning via BigQuery API if available + if (clientPool != null) { + try { + return getTablePartitioningViaApi(databaseName, tableName); + } catch (Exception e) { + LOG.warn( + "Failed to get table partitioning via BigQuery API for table: {}, falling back to empty array", + tableName, + e); + } + } + + // Fallback: BigQuery partitioning information is not accessible via JDBC return Transforms.EMPTY_TRANSFORM; } @@ -701,8 +729,17 @@ public class BigQueryTableOperations extends JdbcTableOperations { String databaseName, String tableName, JdbcTable.Builder tableBuilder) { - // BigQuery-specific table field corrections can be added here if needed - // For now, use default JDBC behavior + // Correct table comment via BigQuery API if available + if (clientPool != null) { + try { + correctTableFieldsViaApi(databaseName, tableName, tableBuilder); + } catch (Exception e) { + LOG.warn( + "Failed to correct table fields via BigQuery API for table: {}, using JDBC defaults", + tableName, + e); + } + } } @Override @@ -1055,7 +1092,7 @@ public class BigQueryTableOperations extends JdbcTableOperations { * @throws IllegalArgumentException if invalid rounding mode */ private void validateRoundingMode(String roundingMode) { - if (roundingMode == null || roundingMode.trim().isEmpty()) { + if (StringUtils.isBlank(roundingMode)) { return; } @@ -1076,7 +1113,7 @@ public class BigQueryTableOperations extends JdbcTableOperations { * @throws IllegalArgumentException if invalid file format */ private void validateFileFormat(String fileFormat) { - if (fileFormat == null || fileFormat.trim().isEmpty()) { + if (StringUtils.isBlank(fileFormat)) { return; } @@ -1097,7 +1134,7 @@ public class BigQueryTableOperations extends JdbcTableOperations { * @throws IllegalArgumentException if invalid table format */ private void validateTableFormat(String tableFormat) { - if (tableFormat == null || tableFormat.trim().isEmpty()) { + if (StringUtils.isBlank(tableFormat)) { return; } @@ -1118,4 +1155,173 @@ public class BigQueryTableOperations extends JdbcTableOperations { // Return NONE distribution as clustering is handled via table properties return Distributions.NONE; } + + /** + * Gets table properties via BigQuery API. + * + * @param databaseName the dataset name + * @param tableName the table name + * @return map of table properties + * @throws IOException if API call fails + */ + private Map<String, String> getTablePropertiesViaApi(String databaseName, String tableName) + throws IOException { + Bigquery bigquery = clientPool.getClient(); + String projectId = clientPool.getProjectId(); + + Table table = bigquery.tables().get(projectId, databaseName, tableName).execute(); + + Map<String, String> properties = new HashMap<>(); + + // Clustering fields + if (table.getClustering() != null) { + List<String> fields = table.getClustering().getFields(); + if (CollectionUtils.isNotEmpty(fields)) { + properties.put(BigQueryTablePropertiesMetadata.CLUSTERING_FIELDS, String.join(",", fields)); + } + } + + // Time partitioning properties + if (table.getTimePartitioning() != null) { + TimePartitioning timePartitioning = table.getTimePartitioning(); + + // Partition expiration days + if (timePartitioning.getExpirationMs() != null) { + long expirationMs = timePartitioning.getExpirationMs(); + double expirationDays = expirationMs / (1000.0 * 60 * 60 * 24); + properties.put( + BigQueryTablePropertiesMetadata.PARTITION_EXPIRATION_DAYS, + String.valueOf(expirationDays)); + } + + // Require partition filter + if (timePartitioning.getRequirePartitionFilter() != null) { + properties.put( + BigQueryTablePropertiesMetadata.REQUIRE_PARTITION_FILTER, + String.valueOf(timePartitioning.getRequirePartitionFilter())); + } + } + + // Expiration timestamp + if (table.getExpirationTime() != null) { + long expirationMs = table.getExpirationTime(); + java.time.Instant instant = java.time.Instant.ofEpochMilli(expirationMs); + properties.put( + BigQueryTablePropertiesMetadata.EXPIRATION_TIMESTAMP, + instant.toString().replace("Z", "+00:00")); + } + + // Friendly name + if (StringUtils.isNotBlank(table.getFriendlyName())) { + properties.put(BigQueryTablePropertiesMetadata.FRIENDLY_NAME, table.getFriendlyName()); + } + + // Labels + if (MapUtils.isNotEmpty(table.getLabels())) { + String labelsJson = formatLabelsFromApi(table.getLabels()); + properties.put(BigQueryTablePropertiesMetadata.LABELS, labelsJson); + } + + // KMS key name + if (table.getEncryptionConfiguration() != null) { + String kmsKeyName = table.getEncryptionConfiguration().getKmsKeyName(); + if (StringUtils.isNotBlank(kmsKeyName)) { + properties.put(BigQueryTablePropertiesMetadata.KMS_KEY_NAME, kmsKeyName); + } + } + + // Default rounding mode + if (StringUtils.isNotBlank(table.getDefaultRoundingMode())) { + properties.put( + BigQueryTablePropertiesMetadata.DEFAULT_ROUNDING_MODE, table.getDefaultRoundingMode()); + } + + return properties; + } + + /** + * Gets table partitioning via BigQuery API. + * + * @param databaseName the dataset name + * @param tableName the table name + * @return array of partition transforms + * @throws IOException if API call fails + */ + private Transform[] getTablePartitioningViaApi(String databaseName, String tableName) + throws IOException { + Bigquery bigquery = clientPool.getClient(); + String projectId = clientPool.getProjectId(); + + Table table = bigquery.tables().get(projectId, databaseName, tableName).execute(); + + if (table.getTimePartitioning() == null) { + return Transforms.EMPTY_TRANSFORM; + } + + TimePartitioning timePartitioning = table.getTimePartitioning(); + String type = timePartitioning.getType(); + String field = timePartitioning.getField(); + + if (field == null) { + // Ingestion-time partitioning (_PARTITIONTIME) + return Transforms.EMPTY_TRANSFORM; + } + + // Map BigQuery partition type to Gravitino transform + switch (type) { + case "DAY": + return new Transform[] {Transforms.day(field)}; + case "HOUR": + return new Transform[] {Transforms.hour(field)}; + case "MONTH": + return new Transform[] {Transforms.month(field)}; + case "YEAR": + return new Transform[] {Transforms.year(field)}; + default: + throw new IllegalArgumentException( + String.format( + "Unsupported BigQuery partition type: %s. Gravitino only supports DAY, HOUR, MONTH, and YEAR partitioning.", + type)); + } + } + + /** + * Corrects table fields via BigQuery API. + * + * @param databaseName the dataset name + * @param tableName the table name + * @param tableBuilder the table builder to update + * @throws IOException if API call fails + */ + private void correctTableFieldsViaApi( + String databaseName, String tableName, JdbcTable.Builder tableBuilder) throws IOException { + Bigquery bigquery = clientPool.getClient(); + String projectId = clientPool.getProjectId(); + + Table table = bigquery.tables().get(projectId, databaseName, tableName).execute(); + + // Correct table comment/description + if (StringUtils.isNotBlank(table.getDescription())) { + tableBuilder.withComment(table.getDescription()); + } + } + + /** + * Formats labels from BigQuery API response to JSON array format. + * + * @param labels map of labels from BigQuery API + * @return JSON array string + */ + private String formatLabelsFromApi(Map<String, String> labels) { + if (MapUtils.isEmpty(labels)) { + return "[]"; + } + + List<String> labelPairs = new ArrayList<>(); + for (Map.Entry<String, String> entry : labels.entrySet()) { + labelPairs.add(String.format("{\"%s\":\"%s\"}", entry.getKey(), entry.getValue())); + } + + return "[" + String.join(", ", labelPairs) + "]"; + } } diff --git a/docs/jdbc-bigquery-catalog.md b/docs/jdbc-bigquery-catalog.md index 130fb0d865..569bdbc8fb 100644 --- a/docs/jdbc-bigquery-catalog.md +++ b/docs/jdbc-bigquery-catalog.md @@ -127,15 +127,11 @@ Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metada |----------------|---------------| | `Binary` | `BYTES` | | `Boolean` | `BOOL` | -| `Byte` | `INT64` | | `Char` | `STRING` | | `Date` | `DATE` | | `Decimal` | `NUMERIC` | | `Double` | `FLOAT64` | -| `Float` | `FLOAT64` | -| `Integer` | `INT64` | | `Long` | `INT64` | -| `Short` | `INT64` | | `String` | `STRING` | | `Time` | `TIME` | | `Timestamp` | `DATETIME` |
