This is an automated email from the ASF dual-hosted git repository. yuqi4733 pushed a commit to branch internal-main in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 30a35a0bce46e885a8b0c19e1bc60f336739f453 Author: geyanggang <[email protected]> AuthorDate: Tue Jan 13 16:23:19 2026 +0800 [#94]feat(bigquery-catalog): Add integration test for BigQuery catalog. (#103) * Add integration test for BQ catalog. * user document update. * Add BIgQuery user document. * Fix bugs. * Fix bugs. --- catalogs/catalog-jdbc-bigquery/build.gradle.kts | 1 + .../integration/CatalogBigQueryCrossRegionIT.java | 584 ++++++++++++++++ .../bigquery/integration/CatalogBigQueryIT.java | 768 +++++++++++++++++++++ .../integration/CatalogBigQueryPerformanceIT.java | 758 ++++++++++++++++++++ .../src/test/resources/log4j2.properties | 57 ++ docs/jdbc-bigquery-catalog.md | 317 +++++++++ 6 files changed, 2485 insertions(+) diff --git a/catalogs/catalog-jdbc-bigquery/build.gradle.kts b/catalogs/catalog-jdbc-bigquery/build.gradle.kts index 0311d0f5ff..728cf7fafa 100644 --- a/catalogs/catalog-jdbc-bigquery/build.gradle.kts +++ b/catalogs/catalog-jdbc-bigquery/build.gradle.kts @@ -123,6 +123,7 @@ dependencies { testImplementation(libs.junit.jupiter.params) testImplementation(libs.testcontainers) testImplementation(libs.mockito.core) + testImplementation(libs.awaitility) val simbaJdbcDriver = files( simbaExtractDir.asFileTree.matching { diff --git a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryCrossRegionIT.java b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryCrossRegionIT.java new file mode 100644 index 0000000000..c40dd1890d --- /dev/null +++ b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryCrossRegionIT.java @@ -0,0 +1,584 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.bigquery.integration; + +import static org.apache.gravitino.catalog.bigquery.BigQueryCatalogPropertiesMetadata.PROJECT_ID; +import static org.awaitility.Awaitility.await; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.Schema; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.integration.test.util.BaseIT; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.types.Types; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.api.condition.EnabledIf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cross-region integration tests for BigQuery catalog. + * + * <p>This test class focuses on testing BigQuery's cross-region capabilities, including: + * + * <ul> + * <li>Cross-region dataset listing and operations + * <li>Multi-region dataset creation and management + * <li>Cross-region table operations + * <li>Performance testing across regions + * </ul> + * + * <p>Requires the same environment variables as CatalogBigQueryIT plus: + * + * <ul> + * <li>BIGQUERY_MULTI_REGION_TEST - Set to "true" to enable multi-region tests + * </ul> + */ +@TestInstance(Lifecycle.PER_CLASS) +@EnabledIf("isCrossRegionTestConfigured") +public class CatalogBigQueryCrossRegionIT extends BaseIT { + + private static final Logger LOG = LoggerFactory.getLogger(CatalogBigQueryCrossRegionIT.class); + private static final String provider = "jdbc-bigquery"; + + // Environment variable names + private static final String ENV_PROJECT_ID = "BIGQUERY_PROJECT_ID"; + private static final String ENV_KEY_PATH = "BIGQUERY_SERVICE_ACCOUNT_KEY_PATH"; + private static final String ENV_SERVICE_ACCOUNT_EMAIL = "BIGQUERY_SERVICE_ACCOUNT_EMAIL"; + private static final String ENV_MULTI_REGION_TEST = "BIGQUERY_MULTI_REGION_TEST"; + + // Test regions + private static final List<String> TEST_REGIONS = Arrays.asList("US", "EU", "asia-east1"); + + // Test configuration + private String projectId; + private String keyPath; + private String serviceAccountEmail; + + // Test identifiers + public String metalakeName = GravitinoITUtils.genRandomName("bigquery_cross_region_metalake"); + public String catalogName = GravitinoITUtils.genRandomName("bigquery_cross_region_catalog"); + + // Track created resources to clean up + private final Set<String> createdSchemas = ConcurrentHashMap.newKeySet(); + private final Set<NameIdentifier> createdTables = ConcurrentHashMap.newKeySet(); + + private GravitinoMetalake metalake; + protected Catalog catalog; + + /** + * Check if BigQuery cross-region testing is configured. + * + * @return true if all required environment variables are set and multi-region test is enabled + */ + static boolean isCrossRegionTestConfigured() { + return StringUtils.isNotBlank(System.getenv(ENV_PROJECT_ID)) + && StringUtils.isNotBlank(System.getenv(ENV_KEY_PATH)) + && "true".equalsIgnoreCase(System.getenv(ENV_MULTI_REGION_TEST)); + } + + @BeforeAll + public void startup() { + // Load configuration from environment variables + projectId = System.getenv(ENV_PROJECT_ID); + keyPath = System.getenv(ENV_KEY_PATH); + serviceAccountEmail = System.getenv(ENV_SERVICE_ACCOUNT_EMAIL); + boolean multiRegionTestEnabled = "true".equalsIgnoreCase(System.getenv(ENV_MULTI_REGION_TEST)); + + LOG.info("Starting BigQuery cross-region integration tests with project: {}", projectId); + LOG.info("Multi-region test enabled: {}", multiRegionTestEnabled); + + createMetalake(); + catalog = createCatalog(catalogName); + } + + @AfterAll + public void stop() { + // Clean up only resources created by this test + cleanupTestResources(); + + // Do NOT delete catalog or metalake - they may be shared + LOG.info("Test cleanup completed. Catalog and metalake are preserved."); + } + + @AfterEach + public void cleanup() { + // Clean up resources created in this test method + cleanupTestResources(); + } + + private void cleanupTestResources() { + LOG.info( + "Cleaning up test resources: {} schemas, {} tables", + createdSchemas.size(), + createdTables.size()); + + // First, clean up all tables created by tests + Set<NameIdentifier> tablesToDelete = new HashSet<>(createdTables); + for (NameIdentifier tableId : tablesToDelete) { + try { + LOG.debug("Dropping table: {}", tableId); + catalog.asTableCatalog().dropTable(tableId); + createdTables.remove(tableId); + } catch (Exception e) { + LOG.warn("Failed to drop table {}: {}", tableId, e.getMessage()); + } + } + + // Then, clean up all schemas created by tests + Set<String> schemasToDelete = new HashSet<>(createdSchemas); + for (String schemaName : schemasToDelete) { + try { + LOG.debug("Dropping schema: {}", schemaName); + catalog.asSchemas().dropSchema(schemaName, true); + createdSchemas.remove(schemaName); + } catch (Exception e) { + LOG.warn("Failed to drop schema {}: {}", schemaName, e.getMessage()); + } + } + + // Wait for BigQuery operations to complete + await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(500)) + .until(() -> true); // BigQuery operations are eventually consistent + } + + private void createMetalake() { + try { + // Check if metalake already exists + GravitinoMetalake[] existingMetalakes = client.listMetalakes(); + for (GravitinoMetalake existing : existingMetalakes) { + if (existing.name().equals(metalakeName)) { + LOG.info("Using existing metalake: {}", metalakeName); + metalake = client.loadMetalake(metalakeName); + return; + } + } + + // Create new metalake if it doesn't exist + client.createMetalake(metalakeName, "Cross-region test metalake", Collections.emptyMap()); + metalake = client.loadMetalake(metalakeName); + Assertions.assertEquals(metalakeName, metalake.name()); + LOG.info("Created new metalake: {}", metalakeName); + } catch (Exception e) { + LOG.error("Failed to create or load metalake: {}", e.getMessage()); + throw new RuntimeException("Failed to setup metalake", e); + } + } + + private Catalog createCatalog(String catalogName) { + try { + // Check if catalog already exists in the metalake + String[] existingCatalogs = metalake.listCatalogs(); + for (String existing : existingCatalogs) { + if (existing.equals(catalogName)) { + LOG.info("Using existing catalog: {}", catalogName); + return metalake.loadCatalog(catalogName); + } + } + + // Create new catalog if it doesn't exist + Map<String, String> catalogProperties = Maps.newHashMap(); + + // Required properties + catalogProperties.put(PROJECT_ID, projectId); + catalogProperties.put(JdbcConfig.PASSWORD.getKey(), keyPath); // Key file path + catalogProperties.put( + JdbcConfig.JDBC_DRIVER.getKey(), "com.simba.googlebigquery.jdbc42.Driver"); + + // Optional properties + if (StringUtils.isNotBlank(serviceAccountEmail)) { + catalogProperties.put(JdbcConfig.USERNAME.getKey(), serviceAccountEmail); + } + + Catalog createdCatalog = + metalake.createCatalog( + catalogName, + Catalog.Type.RELATIONAL, + provider, + "Cross-region test catalog", + catalogProperties); + Catalog loadCatalog = metalake.loadCatalog(catalogName); + Assertions.assertEquals(createdCatalog, loadCatalog); + + LOG.info("Created new catalog: {}", catalogName); + return loadCatalog; + } catch (Exception e) { + LOG.error("Failed to create or load catalog: {}", e.getMessage()); + throw new RuntimeException("Failed to setup catalog", e); + } + } + + private Column[] createTestColumns() { + return new Column[] { + Column.of("id", Types.LongType.get(), "ID column"), + Column.of("name", Types.StringType.get(), "Name column"), + Column.of("created_at", Types.TimestampType.withoutTimeZone(), "Creation timestamp") + }; + } + + @Test + void testCrossRegionSchemaListing() { + LOG.info("Testing cross-region schema listing"); + + // Create schemas in different regions + Map<String, String> createdTestSchemas = Maps.newHashMap(); + + for (String region : TEST_REGIONS) { + String schemaName = + GravitinoITUtils.genRandomName("bigquery_cross_region_test_" + region.toLowerCase()); + Map<String, String> properties = Maps.newHashMap(); + properties.put("location", region); + properties.put("description", "Cross-region test schema in " + region); + + try { + catalog.asSchemas().createSchema(schemaName, "Test schema in " + region, properties); + createdTestSchemas.put(region, schemaName); + createdSchemas.add(schemaName); // Track for cleanup + LOG.info("Created schema {} in region {}", schemaName, region); + } catch (Exception e) { + LOG.warn("Failed to create schema in region {}: {}", region, e.getMessage()); + } + } + + // List all schemas and verify cross-region visibility + String[] allSchemas = catalog.asSchemas().listSchemas(); + Set<String> schemaSet = new HashSet<>(Arrays.asList(allSchemas)); + + LOG.info("Total schemas found: {}", allSchemas.length); + + // Verify that schemas from different regions are visible + for (Map.Entry<String, String> entry : createdTestSchemas.entrySet()) { + String region = entry.getKey(); + String schemaName = entry.getValue(); + + Assertions.assertTrue( + schemaSet.contains(schemaName), + "Schema " + schemaName + " from region " + region + " should be visible"); + + // Load schema and verify properties + Schema loadedSchema = catalog.asSchemas().loadSchema(schemaName); + Assertions.assertEquals(schemaName, loadedSchema.name()); + Assertions.assertEquals(region, loadedSchema.properties().get("location")); + } + + LOG.info("Cross-region schema listing test completed successfully"); + } + + @Test + void testCrossRegionTableOperations() { + LOG.info("Testing cross-region table operations"); + + // Create a schema in US region + String usSchemaName = GravitinoITUtils.genRandomName("bigquery_cross_region_us_test"); + Map<String, String> usProperties = Maps.newHashMap(); + usProperties.put("location", "US"); + catalog.asSchemas().createSchema(usSchemaName, "US test schema", usProperties); + createdSchemas.add(usSchemaName); // Track for cleanup + + // Create a schema in EU region (if supported) + String euSchemaName = GravitinoITUtils.genRandomName("bigquery_cross_region_eu_test"); + Map<String, String> euProperties = Maps.newHashMap(); + euProperties.put("location", "EU"); + + boolean euSchemaCreated = false; + try { + catalog.asSchemas().createSchema(euSchemaName, "EU test schema", euProperties); + createdSchemas.add(euSchemaName); // Track for cleanup + euSchemaCreated = true; + LOG.info("Successfully created EU schema: {}", euSchemaName); + } catch (Exception e) { + LOG.warn("Failed to create EU schema, continuing with US only: {}", e.getMessage()); + } + + // Create tables in US schema + String usTableName = GravitinoITUtils.genRandomName("us_table"); + NameIdentifier usTableId = NameIdentifier.of(usSchemaName, usTableName); + + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + usTableId, + createTestColumns(), + "US table", + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + createdTables.add(usTableId); // Track for cleanup + + // Create table in EU schema if available + if (euSchemaCreated) { + String euTableName = GravitinoITUtils.genRandomName("eu_table"); + NameIdentifier euTableId = NameIdentifier.of(euSchemaName, euTableName); + + tableCatalog.createTable( + euTableId, + createTestColumns(), + "EU table", + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + createdTables.add(euTableId); // Track for cleanup + + // Verify both tables are accessible + Table usTable = tableCatalog.loadTable(usTableId); + Table euTable = tableCatalog.loadTable(euTableId); + + Assertions.assertEquals(usTableName, usTable.name()); + Assertions.assertEquals(euTableName, euTable.name()); + + LOG.info("Successfully created and accessed tables in both US and EU regions"); + } + + // List tables across regions + NameIdentifier[] usTables = tableCatalog.listTables(Namespace.of(usSchemaName)); + Assertions.assertTrue(usTables.length > 0); + + if (euSchemaCreated) { + NameIdentifier[] euTables = tableCatalog.listTables(Namespace.of(euSchemaName)); + Assertions.assertTrue(euTables.length > 0); + } + + LOG.info("Cross-region table operations test completed successfully"); + } + + @Test + void testMultiRegionPerformanceComparison() { + LOG.info("Testing multi-region performance comparison"); + + Map<String, Long> regionPerformance = Maps.newHashMap(); + + for (String region : TEST_REGIONS) { + try { + String schemaName = + GravitinoITUtils.genRandomName("bigquery_multi_region_perf_" + region.toLowerCase()); + Map<String, String> properties = Maps.newHashMap(); + properties.put("location", region); + + // Measure schema creation time + long startTime = System.currentTimeMillis(); + catalog.asSchemas().createSchema(schemaName, "Performance test schema", properties); + long schemaCreationTime = System.currentTimeMillis() - startTime; + createdSchemas.add(schemaName); // Track for cleanup + + // Measure table creation time + String tableName = GravitinoITUtils.genRandomName("perf_table"); + NameIdentifier tableId = NameIdentifier.of(schemaName, tableName); + + startTime = System.currentTimeMillis(); + catalog + .asTableCatalog() + .createTable( + tableId, + createTestColumns(), + "Performance test table", + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + long tableCreationTime = System.currentTimeMillis() - startTime; + createdTables.add(tableId); // Track for cleanup + + // Measure table loading time + startTime = System.currentTimeMillis(); + Table loadedTable = catalog.asTableCatalog().loadTable(tableId); + long tableLoadTime = System.currentTimeMillis() - startTime; + + Assertions.assertEquals(tableName, loadedTable.name()); + + long totalTime = schemaCreationTime + tableCreationTime + tableLoadTime; + regionPerformance.put(region, totalTime); + + LOG.info( + "Region {} performance: Schema creation: {}ms, Table creation: {}ms, Table load: {}ms, Total: {}ms", + region, + schemaCreationTime, + tableCreationTime, + tableLoadTime, + totalTime); + + } catch (Exception e) { + LOG.warn("Performance test failed for region {}: {}", region, e.getMessage()); + regionPerformance.put(region, -1L); // Mark as failed + } + } + + // Report performance comparison + LOG.info("Multi-region performance comparison:"); + regionPerformance.forEach( + (region, time) -> { + if (time > 0) { + LOG.info(" {}: {}ms", region, time); + } else { + LOG.info(" {}: FAILED", region); + } + }); + + // Verify at least one region worked + boolean anyRegionWorked = regionPerformance.values().stream().anyMatch(time -> time > 0); + Assertions.assertTrue(anyRegionWorked, "At least one region should work"); + + LOG.info("Multi-region performance comparison completed"); + } + + @Test + void testCrossRegionDatasetProperties() { + LOG.info("Testing cross-region dataset properties"); + + Map<String, String> testRegions = Maps.newHashMap(); + testRegions.put("US", "United States"); + testRegions.put("EU", "Europe"); + + for (Map.Entry<String, String> entry : testRegions.entrySet()) { + String region = entry.getKey(); + String description = entry.getValue(); + + try { + String schemaName = + GravitinoITUtils.genRandomName("bigquery_cross_region_props_" + region.toLowerCase()); + + Map<String, String> properties = Maps.newHashMap(); + properties.put("location", region); + properties.put("description", "Test dataset in " + description); + properties.put("friendly_name", "Cross-region test dataset"); + + // Create schema with properties + Schema createdSchema = + catalog + .asSchemas() + .createSchema(schemaName, "Cross-region properties test", properties); + createdSchemas.add(schemaName); // Track for cleanup + + Assertions.assertEquals(schemaName, createdSchema.name()); + + // Load schema and verify properties + Schema loadedSchema = catalog.asSchemas().loadSchema(schemaName); + Assertions.assertEquals(schemaName, loadedSchema.name()); + Assertions.assertEquals(region, loadedSchema.properties().get("location")); + + LOG.info("Successfully tested properties for region: {}", region); + + } catch (Exception e) { + LOG.warn("Properties test failed for region {}: {}", region, e.getMessage()); + } + } + + LOG.info("Cross-region dataset properties test completed"); + } + + @Test + void testCrossRegionConcurrentOperations() throws InterruptedException { + LOG.info("Testing cross-region concurrent operations"); + + int numThreads = Math.min(3, TEST_REGIONS.size()); // Limit to 3 threads max + Thread[] threads = new Thread[numThreads]; + + for (int i = 0; i < numThreads; i++) { + final String region = TEST_REGIONS.get(i); + final int threadId = i; + + threads[i] = + new Thread( + () -> { + try { + String schemaName = + GravitinoITUtils.genRandomName( + "bigquery_cross_region_concurrent_" + region.toLowerCase()); + String tableName = GravitinoITUtils.genRandomName("concurrent_table"); + + Map<String, String> properties = Maps.newHashMap(); + properties.put("location", region); + + // Create schema + catalog + .asSchemas() + .createSchema(schemaName, "Concurrent test schema", properties); + createdSchemas.add(schemaName); // Track for cleanup + + // Create table + NameIdentifier tableId = NameIdentifier.of(schemaName, tableName); + catalog + .asTableCatalog() + .createTable( + tableId, + createTestColumns(), + "Concurrent test table", + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + createdTables.add(tableId); // Track for cleanup + + // Load table + Table loadedTable = catalog.asTableCatalog().loadTable(tableId); + Assertions.assertEquals(tableName, loadedTable.name()); + + LOG.info("Thread {} completed operations for region {}", threadId, region); + + } catch (Exception e) { + LOG.error( + "Concurrent operation failed for region {} in thread {}", + region, + threadId, + e); + // Don't throw exception to avoid failing other threads + } + }); + } + + // Start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(60000); // 60 second timeout per thread + } + + LOG.info("Cross-region concurrent operations test completed"); + } +} diff --git a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryIT.java b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryIT.java new file mode 100644 index 0000000000..2cc1b920ac --- /dev/null +++ b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryIT.java @@ -0,0 +1,768 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.bigquery.integration; + +import static org.apache.gravitino.catalog.bigquery.BigQueryCatalogPropertiesMetadata.PROJECT_ID; +import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.Schema; +import org.apache.gravitino.SupportsSchemas; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.integration.test.util.BaseIT; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.literals.Literals; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.types.Types; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.api.condition.EnabledIf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@TestInstance(Lifecycle.PER_CLASS) +@EnabledIf("isBigQueryConfigured") +public class CatalogBigQueryIT extends BaseIT { + + private static final Logger LOG = LoggerFactory.getLogger(CatalogBigQueryIT.class); + private static final String provider = "jdbc-bigquery"; + + private static final String ENV_PROJECT_ID = "BIGQUERY_PROJECT_ID"; + private static final String ENV_KEY_PATH = "BIGQUERY_SERVICE_ACCOUNT_KEY_PATH"; + private static final String ENV_SERVICE_ACCOUNT_EMAIL = "BIGQUERY_SERVICE_ACCOUNT_EMAIL"; + private static final String ENV_DATASET_LOCATION = "BIGQUERY_DATASET_LOCATION"; + + private String projectId; + private String keyPath; + private String serviceAccountEmail; + private String datasetLocation; + + public String metalakeName = GravitinoITUtils.genRandomName("bigquery_it_metalake"); + public String catalogName = GravitinoITUtils.genRandomName("bigquery_it_catalog"); + public String schemaName = GravitinoITUtils.genRandomName("bigquery_it_schema"); + public String tableName = GravitinoITUtils.genRandomName("bigquery_it_table"); + public String alertTableName = "alert_table_name"; + public String table_comment = "table_comment"; + public String schema_comment = "schema_comment"; + + public String BIGQUERY_COL_NAME1 = "bigquery_col_name1"; + public String BIGQUERY_COL_NAME2 = "bigquery_col_name2"; + public String BIGQUERY_COL_NAME3 = "bigquery_col_name3"; + public String BIGQUERY_COL_NAME4 = "bigquery_col_name4"; + public String BIGQUERY_COL_NAME5 = "bigquery_col_name5"; + + private GravitinoMetalake metalake; + protected Catalog catalog; + + static boolean isBigQueryConfigured() { + return StringUtils.isNotBlank(System.getenv(ENV_PROJECT_ID)) + && StringUtils.isNotBlank(System.getenv(ENV_KEY_PATH)); + } + + @BeforeAll + public void startup() { + projectId = System.getenv(ENV_PROJECT_ID); + keyPath = System.getenv(ENV_KEY_PATH); + serviceAccountEmail = System.getenv(ENV_SERVICE_ACCOUNT_EMAIL); + datasetLocation = System.getenv(ENV_DATASET_LOCATION); + + if (StringUtils.isBlank(datasetLocation)) { + datasetLocation = "US"; + } + + LOG.info("Starting BigQuery integration tests with project: {}", projectId); + LOG.info("Dataset location: {}", datasetLocation); + + createMetalake(); + catalog = createCatalog(catalogName); + createSchema(catalog, schemaName); + } + + @AfterAll + public void stop() { + clearTableAndSchema(); + LOG.info( + "BigQuery integration tests completed. Catalog and metalake retained for other tests."); + } + + @AfterEach + public void resetSchema() { + clearTableAndSchema(); + createSchema(catalog, schemaName); + } + + private void clearTableAndSchema() { + try { + NameIdentifier[] nameIdentifiers = + catalog.asTableCatalog().listTables(Namespace.of(schemaName)); + for (NameIdentifier nameIdentifier : nameIdentifiers) { + catalog.asTableCatalog().dropTable(nameIdentifier); + } + } catch (Exception e) { + LOG.warn("Error clearing tables", e); + } + } + + private void createMetalake() { + client.createMetalake(metalakeName, "comment", Collections.emptyMap()); + GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName); + Assertions.assertEquals(metalakeName, loadMetalake.name()); + metalake = loadMetalake; + } + + private Catalog createCatalog(String catalogName) { + Map<String, String> catalogProperties = Maps.newHashMap(); + + catalogProperties.put(PROJECT_ID, projectId); + catalogProperties.put(JdbcConfig.PASSWORD.getKey(), keyPath); + catalogProperties.put( + JdbcConfig.JDBC_DRIVER.getKey(), "com.simba.googlebigquery.jdbc42.Driver"); + + if (StringUtils.isNotBlank(serviceAccountEmail)) { + catalogProperties.put(JdbcConfig.USERNAME.getKey(), serviceAccountEmail); + } + + Catalog createdCatalog = + metalake.createCatalog( + catalogName, Catalog.Type.RELATIONAL, provider, "comment", catalogProperties); + Catalog loadCatalog = metalake.loadCatalog(catalogName); + Assertions.assertEquals(createdCatalog, loadCatalog); + + return loadCatalog; + } + + private void createSchema(Catalog catalog, String schemaName) { + Map<String, String> prop = Maps.newHashMap(); + prop.put("location", datasetLocation); + + try { + Schema loadSchema = catalog.asSchemas().loadSchema(schemaName); + if (loadSchema != null) { + LOG.info("Schema {} already exists, skipping creation", schemaName); + return; + } + } catch (NoSuchSchemaException e) { + Schema createdSchema = catalog.asSchemas().createSchema(schemaName, schema_comment, prop); + Schema loadSchema = catalog.asSchemas().loadSchema(schemaName); + Assertions.assertEquals(createdSchema.name(), loadSchema.name()); + prop.forEach( + (key, value) -> Assertions.assertEquals(loadSchema.properties().get(key), value)); + } + } + + private Column[] createColumns() { + Column col1 = Column.of(BIGQUERY_COL_NAME1, Types.LongType.get(), "col_1_comment"); + Column col2 = Column.of(BIGQUERY_COL_NAME2, Types.DateType.get(), "col_2_comment"); + Column col3 = Column.of(BIGQUERY_COL_NAME3, Types.StringType.get(), "col_3_comment"); + + return new Column[] {col1, col2, col3}; + } + + private Column[] createColumnsWithDefaultValue() { + return new Column[] { + Column.of( + BIGQUERY_COL_NAME1, + Types.DoubleType.get(), + "col_1_comment", + false, + false, + Literals.of("1.23", Types.DoubleType.get())), + Column.of( + BIGQUERY_COL_NAME2, + Types.TimestampType.withoutTimeZone(), + "col_2_comment", + false, + false, + DEFAULT_VALUE_OF_CURRENT_TIMESTAMP), + Column.of( + BIGQUERY_COL_NAME3, Types.StringType.get(), "col_3_comment", true, false, Literals.NULL), + Column.of( + BIGQUERY_COL_NAME4, + Types.LongType.get(), + "col_4_comment", + false, + false, + Literals.of("1000", Types.LongType.get())), + Column.of( + BIGQUERY_COL_NAME5, + Types.DecimalType.of(10, 2), + "col_5_comment", + true, + false, + Literals.of("1.23", Types.DecimalType.of(10, 2))) + }; + } + + private Map<String, String> createProperties() { + Map<String, String> properties = Maps.newHashMap(); + properties.put("description", "Test table for BigQuery integration"); + return properties; + } + + @Test + void testTestConnection() { + Map<String, String> catalogProperties = Maps.newHashMap(); + catalogProperties.put(PROJECT_ID, projectId); + catalogProperties.put(JdbcConfig.PASSWORD.getKey(), "/invalid/path/to/key.json"); + catalogProperties.put(JdbcConfig.USERNAME.getKey(), "[email protected]"); + catalogProperties.put( + JdbcConfig.JDBC_DRIVER.getKey(), "com.simba.googlebigquery.jdbc42.Driver"); + + Exception exception = + assertThrows( + RuntimeException.class, + () -> + metalake.testConnection( + GravitinoITUtils.genRandomName("bigquery_it_catalog"), + Catalog.Type.RELATIONAL, + provider, + "comment", + catalogProperties)); + Assertions.assertTrue( + exception.getMessage().contains("Error creating datasource") + || exception.getMessage().contains("Failed to create BigQuery API client") + || exception.getMessage().contains("Connection failed")); + } + + @Test + void testOperationBigQuerySchema() { + SupportsSchemas schemas = catalog.asSchemas(); + + String[] nameIdentifiers = schemas.listSchemas(); + Set<String> schemaNames = Sets.newHashSet(nameIdentifiers); + Assertions.assertTrue(schemaNames.contains(schemaName.toLowerCase())); + + String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1").toLowerCase(); + Map<String, String> properties = Maps.newHashMap(); + properties.put("location", datasetLocation); + + Schema createdSchema = schemas.createSchema(testSchemaName, schema_comment, properties); + Assertions.assertNotNull(createdSchema); + Assertions.assertEquals(testSchemaName, createdSchema.name()); + + nameIdentifiers = schemas.listSchemas(); + schemaNames = Sets.newHashSet(nameIdentifiers); + Assertions.assertTrue(schemaNames.contains(testSchemaName)); + + Map<String, String> emptyMap = Collections.emptyMap(); + Assertions.assertThrows( + SchemaAlreadyExistsException.class, + () -> schemas.createSchema(testSchemaName, schema_comment, emptyMap)); + + schemas.dropSchema(testSchemaName, false); + Assertions.assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(testSchemaName)); + + nameIdentifiers = schemas.listSchemas(); + schemaNames = Sets.newHashSet(nameIdentifiers); + Assertions.assertFalse(schemaNames.contains(testSchemaName)); + Assertions.assertFalse(schemas.dropSchema("no_exits", false)); + + TableCatalog tableCatalog = catalog.asTableCatalog(); + NameIdentifier table = NameIdentifier.of(testSchemaName, "test_table"); + Assertions.assertThrows( + NoSuchSchemaException.class, + () -> + tableCatalog.createTable( + table, + createColumns(), + table_comment, + createProperties(), + null, + Distributions.NONE, + null)); + + Assertions.assertFalse(schemas.dropSchema(testSchemaName, true)); + Assertions.assertFalse(schemas.dropSchema(testSchemaName, false)); + Assertions.assertFalse(tableCatalog.dropTable(table)); + } + + @Test + void testCreateAndLoadBigQueryTable() { + Column[] columns = createColumns(); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + + final SortOrder[] sortOrders = new SortOrder[0]; + Transform[] partitioning = Transforms.EMPTY_TRANSFORM; + Map<String, String> properties = createProperties(); + + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + Distributions.NONE, + sortOrders); + + Assertions.assertNotNull(createdTable); + Assertions.assertEquals(tableName, createdTable.name()); + + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + + Map<String, String> resultProp = loadTable.properties(); + Assertions.assertNotNull(resultProp); + + Assertions.assertEquals(loadTable.columns().length, columns.length); + for (int i = 0; i < columns.length; i++) { + Assertions.assertEquals(columns[i].name(), loadTable.columns()[i].name()); + Assertions.assertEquals(columns[i].dataType(), loadTable.columns()[i].dataType()); + } + } + + @Test + void testCreateTableWithPartitioning() { + Column[] columns = { + Column.of("id", Types.LongType.get(), "ID column"), + Column.of("created_date", Types.DateType.get(), "Creation date"), + Column.of("name", Types.StringType.get(), "Name column") + }; + + String partitionedTableName = GravitinoITUtils.genRandomName("partitioned_table"); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, partitionedTableName); + + Transform[] partitioning = new Transform[] {Transforms.day(new String[] {"created_date"})}; + + Map<String, String> properties = Maps.newHashMap(); + properties.put("description", "Partitioned table test"); + + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + "Partitioned table", + properties, + partitioning, + Distributions.NONE, + new SortOrder[0]); + + Assertions.assertEquals(partitionedTableName, createdTable.name()); + + Table loadedTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(partitionedTableName, loadedTable.name()); + Assertions.assertEquals(3, loadedTable.columns().length); + } + + @Test + void testCreateTableWithClustering() { + Column[] columns = { + Column.of("id", Types.LongType.get(), "ID column"), + Column.of("category", Types.StringType.get(), "Category column"), + Column.of("subcategory", Types.StringType.get(), "Subcategory column"), + Column.of("amount", Types.DecimalType.of(10, 2), "Amount column") + }; + + String clusteredTableName = GravitinoITUtils.genRandomName("clustered_table"); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, clusteredTableName); + + Map<String, String> properties = Maps.newHashMap(); + properties.put("description", "Clustered table test"); + properties.put("clustering_fields", "category,subcategory"); + + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + "Clustered table", + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + + Assertions.assertEquals(clusteredTableName, createdTable.name()); + + Table loadedTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(clusteredTableName, loadedTable.name()); + Assertions.assertEquals(4, loadedTable.columns().length); + } + + @Test + void testAlterAndDropBigQueryTable() { + Column[] columns = createColumns(); + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of(schemaName, tableName), columns, table_comment, createProperties()); + + catalog + .asTableCatalog() + .alterTable(NameIdentifier.of(schemaName, tableName), TableChange.rename(alertTableName)); + + catalog + .asTableCatalog() + .alterTable( + NameIdentifier.of(schemaName, alertTableName), + TableChange.updateComment(table_comment + "_new")); + + catalog + .asTableCatalog() + .alterTable( + NameIdentifier.of(schemaName, alertTableName), + TableChange.addColumn(new String[] {"col_4"}, Types.StringType.get())); + + catalog + .asTableCatalog() + .alterTable( + NameIdentifier.of(schemaName, alertTableName), + TableChange.renameColumn(new String[] {BIGQUERY_COL_NAME2}, "col_2_new")); + + Table table = catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, alertTableName)); + Assertions.assertEquals(alertTableName, table.name()); + + Assertions.assertEquals(BIGQUERY_COL_NAME1, table.columns()[0].name()); + Assertions.assertEquals(Types.LongType.get(), table.columns()[0].dataType()); + + Assertions.assertEquals("col_2_new", table.columns()[1].name()); + Assertions.assertEquals(Types.DateType.get(), table.columns()[1].dataType()); + + Assertions.assertEquals(BIGQUERY_COL_NAME3, table.columns()[2].name()); + Assertions.assertEquals(Types.StringType.get(), table.columns()[2].dataType()); + + Assertions.assertEquals("col_4", table.columns()[3].name()); + Assertions.assertEquals(Types.StringType.get(), table.columns()[3].dataType()); + + Assertions.assertNotNull(table.auditInfo()); + + Assertions.assertDoesNotThrow( + () -> { + catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, alertTableName)); + }); + } + + @Test + void testColumnDefaultValue() { + Column[] columns = createColumnsWithDefaultValue(); + catalog + .asTableCatalog() + .createTable(NameIdentifier.of(schemaName, tableName), columns, null, ImmutableMap.of()); + + Table loadedTable = + catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, tableName)); + + Assertions.assertEquals(5, loadedTable.columns().length); + + for (int i = 0; i < loadedTable.columns().length; i++) { + Assertions.assertNotNull(loadedTable.columns()[i]); + Assertions.assertEquals(columns[i].name(), loadedTable.columns()[i].name()); + Assertions.assertEquals(columns[i].dataType(), loadedTable.columns()[i].dataType()); + } + } + + @Test + void testDropBigQueryDataset() { + String testSchemaName = GravitinoITUtils.genRandomName("bigquery_schema").toLowerCase(); + String testTableName = GravitinoITUtils.genRandomName("bigquery_table").toLowerCase(); + + Map<String, String> properties = Maps.newHashMap(); + properties.put("location", datasetLocation); + + catalog.asSchemas().createSchema(testSchemaName, null, properties); + + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of(testSchemaName, testTableName), + createColumns(), + "Created by Gravitino client", + ImmutableMap.<String, String>builder().build()); + + Throwable excep = + Assertions.assertThrows( + RuntimeException.class, () -> catalog.asSchemas().dropSchema(testSchemaName, false)); + + String errorMessage = excep.getMessage().toLowerCase(); + Assertions.assertTrue( + errorMessage.contains("cascade") + || errorMessage.contains("delete") + || errorMessage.contains("dataset") + || errorMessage.contains("non-empty") + || errorMessage.contains("not empty")); + + catalog.asSchemas().loadSchema(testSchemaName); + + catalog.asSchemas().dropSchema(testSchemaName, true); + + SupportsSchemas schemas = catalog.asSchemas(); + Assertions.assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(testSchemaName)); + } + + @Test + void testBigQuerySpecialTableName() { + Map<String, String> properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + + String t1_name = "t112"; + Column t1_col = Column.of(t1_name, Types.LongType.get(), "id", false, false, null); + Column[] columns = {t1_col}; + + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, t1_name); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + + String t2_name = "t212"; + Column t2_col = Column.of(t2_name, Types.LongType.get(), "id", false, false, null); + columns = new Column[] {t2_col}; + tableIdentifier = NameIdentifier.of(schemaName, t2_name); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + + String t3_name = "t_12"; + Column t3_col = Column.of(t3_name, Types.LongType.get(), "id", false, false, null); + columns = new Column[] {t3_col}; + tableIdentifier = NameIdentifier.of(schemaName, t3_name); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + + String t4_name = "_1__"; + Column t4_col = Column.of(t4_name, Types.LongType.get(), "id", false, false, null); + columns = new Column[] {t4_col}; + tableIdentifier = NameIdentifier.of(schemaName, t4_name); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + + Table t1 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t1_name)); + Assertions.assertTrue( + Arrays.stream(t1.columns()).anyMatch(c -> Objects.equals(c.name(), "t112"))); + + Table t2 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t2_name)); + Assertions.assertTrue( + Arrays.stream(t2.columns()).anyMatch(c -> Objects.equals(c.name(), "t212"))); + + Table t3 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t3_name)); + Assertions.assertTrue( + Arrays.stream(t3.columns()).anyMatch(c -> Objects.equals(c.name(), "t_12"))); + + Table t4 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t4_name)); + Assertions.assertTrue( + Arrays.stream(t4.columns()).anyMatch(c -> Objects.equals(c.name(), "_1__"))); + } + + @Test + void testBackQuoteTable() { + Column col1 = Column.of("create", Types.LongType.get(), "id", false, false, null); + Column col2 = Column.of("delete", Types.LongType.get(), "yes", false, false, null); + Column col3 = Column.of("show", Types.DateType.get(), "comment", false, false, null); + Column col4 = Column.of("status", Types.StringType.get(), "code", false, false, null); + Column[] newColumns = new Column[] {col1, col2, col3, col4}; + + TableCatalog tableCatalog = catalog.asTableCatalog(); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, "test_table_with_keywords"); + + Assertions.assertDoesNotThrow( + () -> + tableCatalog.createTable( + tableIdentifier, + newColumns, + table_comment, + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0])); + + Assertions.assertDoesNotThrow(() -> tableCatalog.loadTable(tableIdentifier)); + + Assertions.assertDoesNotThrow( + () -> + tableCatalog.alterTable( + tableIdentifier, + TableChange.addColumn( + new String[] {"int"}, + Types.StringType.get(), + TableChange.ColumnPosition.after("status")))); + + Assertions.assertDoesNotThrow( + () -> + tableCatalog.alterTable( + tableIdentifier, TableChange.deleteColumn(new String[] {"create"}, true))); + + Assertions.assertDoesNotThrow( + () -> + tableCatalog.alterTable( + tableIdentifier, TableChange.renameColumn(new String[] {"delete"}, "varchar"))); + + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier)); + } + + @Test + void testCrossRegionOperations() { + SupportsSchemas schemas = catalog.asSchemas(); + + String[] schemaNames = schemas.listSchemas(); + Assertions.assertTrue(schemaNames.length > 0); + + String testSchemaName = GravitinoITUtils.genRandomName("cross_region_test"); + Map<String, String> properties = Maps.newHashMap(); + properties.put("location", "EU"); + + try { + schemas.createSchema(testSchemaName, "Cross-region test schema", properties); + + Schema loadedSchema = schemas.loadSchema(testSchemaName); + Assertions.assertEquals(testSchemaName, loadedSchema.name()); + + schemas.dropSchema(testSchemaName, false); + } catch (Exception e) { + LOG.warn("Cross-region test failed, this might be expected: {}", e.getMessage()); + } + } + + @Test + void testConcurrentOperations() throws InterruptedException { + int numThreads = 3; + Thread[] threads = new Thread[numThreads]; + + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + threads[i] = + new Thread( + () -> { + try { + String threadTableName = + GravitinoITUtils.genRandomName("concurrent_table_" + threadId); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, threadTableName); + + catalog + .asTableCatalog() + .createTable( + tableIdentifier, + createColumns(), + "Concurrent test table " + threadId, + createProperties(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + + Table loadedTable = catalog.asTableCatalog().loadTable(tableIdentifier); + Assertions.assertEquals(threadTableName, loadedTable.name()); + + catalog.asTableCatalog().dropTable(tableIdentifier); + + } catch (Exception e) { + LOG.error("Concurrent operation failed in thread {}", threadId, e); + throw new RuntimeException(e); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(30000); + } + + LOG.info("Concurrent operations test completed successfully"); + } + + @Test + void testPerformanceOperations() { + long startTime, endTime; + + startTime = System.currentTimeMillis(); + String[] schemas = catalog.asSchemas().listSchemas(); + endTime = System.currentTimeMillis(); + LOG.info("Listed {} schemas in {} ms", schemas.length, endTime - startTime); + + startTime = System.currentTimeMillis(); + NameIdentifier[] tables = catalog.asTableCatalog().listTables(Namespace.of(schemaName)); + endTime = System.currentTimeMillis(); + LOG.info("Listed {} tables in {} ms", tables.length, endTime - startTime); + + String perfTableName = GravitinoITUtils.genRandomName("perf_table"); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, perfTableName); + + startTime = System.currentTimeMillis(); + catalog + .asTableCatalog() + .createTable( + tableIdentifier, + createColumns(), + "Performance test table", + createProperties(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + endTime = System.currentTimeMillis(); + LOG.info("Created table in {} ms", endTime - startTime); + + startTime = System.currentTimeMillis(); + Table loadedTable = catalog.asTableCatalog().loadTable(tableIdentifier); + endTime = System.currentTimeMillis(); + LOG.info("Loaded table in {} ms", endTime - startTime); + Assertions.assertEquals(perfTableName, loadedTable.name()); + + catalog.asTableCatalog().dropTable(tableIdentifier); + } +} diff --git a/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryPerformanceIT.java b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryPerformanceIT.java new file mode 100644 index 0000000000..74d177ee58 --- /dev/null +++ b/catalogs/catalog-jdbc-bigquery/src/test/java/org/apache/gravitino/catalog/bigquery/integration/CatalogBigQueryPerformanceIT.java @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.bigquery.integration; + +import static org.apache.gravitino.catalog.bigquery.BigQueryCatalogPropertiesMetadata.PROJECT_ID; +import static org.awaitility.Awaitility.await; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.Schema; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.integration.test.util.BaseIT; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.types.Types; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.api.condition.EnabledIf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Performance integration tests for BigQuery catalog. + * + * <p>This test class focuses on performance testing of BigQuery catalog operations, including: + * + * <ul> + * <li>Bulk operations performance + * <li>Concurrent operations throughput + * <li>Large-scale metadata operations + * <li>Connection pooling efficiency + * </ul> + * + * <p>Requires the same environment variables as CatalogBigQueryIT plus: + * + * <ul> + * <li>BIGQUERY_PERFORMANCE_TEST - Set to "true" to enable performance tests + * </ul> + */ +@TestInstance(Lifecycle.PER_CLASS) +@EnabledIf("isPerformanceTestConfigured") +public class CatalogBigQueryPerformanceIT extends BaseIT { + + private static final Logger LOG = LoggerFactory.getLogger(CatalogBigQueryPerformanceIT.class); + private static final String provider = "jdbc-bigquery"; + + // Environment variable names + private static final String ENV_PROJECT_ID = "BIGQUERY_PROJECT_ID"; + private static final String ENV_KEY_PATH = "BIGQUERY_SERVICE_ACCOUNT_KEY_PATH"; + private static final String ENV_SERVICE_ACCOUNT_EMAIL = "BIGQUERY_SERVICE_ACCOUNT_EMAIL"; + private static final String ENV_PERFORMANCE_TEST = "BIGQUERY_PERFORMANCE_TEST"; + + // Performance test configuration + private static final int BULK_OPERATION_COUNT = 20; + private static final int CONCURRENT_THREAD_COUNT = 5; + private static final int LARGE_SCALE_COUNT = 30; + + // Performance thresholds + private static final long TABLE_LOADING_THRESHOLD_MS = 3000; + private static final long LISTING_THRESHOLD_MS = 10000; + private static final long TOTAL_OPERATION_THRESHOLD_MS = 300000; + + // Test configuration + private String projectId; + private String keyPath; + private String serviceAccountEmail; + + // Test identifiers + public String metalakeName = GravitinoITUtils.genRandomName("bigquery_perf_metalake"); + public String catalogName = GravitinoITUtils.genRandomName("bigquery_perf_catalog"); + public String schemaName = GravitinoITUtils.genRandomName("bigquery_perf_schema"); + + private GravitinoMetalake metalake; + protected Catalog catalog; + + /** + * Check if BigQuery performance testing is configured. + * + * @return true if all required environment variables are set and performance test is enabled + */ + static boolean isPerformanceTestConfigured() { + return StringUtils.isNotBlank(System.getenv(ENV_PROJECT_ID)) + && StringUtils.isNotBlank(System.getenv(ENV_KEY_PATH)) + && "true".equalsIgnoreCase(System.getenv(ENV_PERFORMANCE_TEST)); + } + + @BeforeAll + public void startup() { + // Load configuration from environment variables + projectId = System.getenv(ENV_PROJECT_ID); + keyPath = System.getenv(ENV_KEY_PATH); + serviceAccountEmail = System.getenv(ENV_SERVICE_ACCOUNT_EMAIL); + + LOG.info("Starting BigQuery performance integration tests with project: {}", projectId); + + createMetalake(); + catalog = createCatalog(catalogName); + createSchema(catalog, schemaName); + } + + @AfterAll + public void stop() { + try { + cleanupAllTestData(); + + if (metalake != null) { + try { + client.disableMetalake(metalakeName); + client.dropMetalake(metalakeName); + LOG.info("Dropped test metalake: {}", metalakeName); + } catch (Exception e) { + LOG.warn("Failed to drop metalake: {}, error: {}", metalakeName, e.getMessage()); + } + } + } catch (Exception e) { + LOG.warn("Error during stop cleanup: {}", e.getMessage()); + } + } + + @AfterEach + public void cleanup() { + cleanupTestData(); + } + + private void cleanupAllTestData() { + LOG.info("Starting comprehensive test data cleanup"); + + try { + if (catalog != null) { + cleanupTestSchemas(); + + cleanupTestTablesInSchema(schemaName); + + dropSchemaIfExists(schemaName); + } + } catch (Exception e) { + LOG.warn("Error during comprehensive cleanup: {}", e.getMessage()); + } + + LOG.info("Completed test data cleanup"); + } + + private void cleanupTestData() { + try { + if (catalog != null) { + cleanupTestTablesInSchema(schemaName); + + cleanupTestSchemasExcludingMain(); + } + } catch (Exception e) { + LOG.warn("Error during test data cleanup: {}", e.getMessage()); + } + } + + private void cleanupTestTablesInSchema(String schemaName) { + try { + NameIdentifier[] tables = catalog.asTableCatalog().listTables(Namespace.of(schemaName)); + int droppedCount = 0; + for (NameIdentifier table : tables) { + if (isTestTable(table.name())) { + try { + catalog.asTableCatalog().dropTable(table); + droppedCount++; + } catch (Exception e) { + LOG.warn("Failed to drop test table {}.{}", schemaName, table.name()); + } + } + } + if (droppedCount > 0) { + LOG.info("Dropped {} test tables from schema: {}", droppedCount, schemaName); + } + } catch (Exception e) { + LOG.warn("Failed to list tables in schema {}: {}", schemaName, e.getMessage()); + } + } + + private void cleanupTestSchemas() { + try { + String[] schemas = catalog.asSchemas().listSchemas(); + for (String schema : schemas) { + if (isTestSchema(schema)) { + cleanupTestTablesInSchema(schema); + + dropSchemaIfExists(schema); + } + } + } catch (Exception e) { + LOG.warn("Failed to list schemas during cleanup: {}", e.getMessage()); + } + } + + private void cleanupTestSchemasExcludingMain() { + try { + String[] schemas = catalog.asSchemas().listSchemas(); + for (String schema : schemas) { + if (isTestSchema(schema) && !schema.equals(schemaName)) { + cleanupTestTablesInSchema(schema); + + dropSchemaIfExists(schema); + } + } + } catch (Exception e) { + LOG.warn("Failed to list schemas during cleanup: {}", e.getMessage()); + } + } + + private void dropSchemaIfExists(String schemaName) { + try { + catalog.asSchemas().dropSchema(schemaName, true); + LOG.info("Dropped schema: {}", schemaName); + } catch (Exception e) { + LOG.warn("Failed to drop schema {}: {}", schemaName, e.getMessage()); + } + } + + private boolean isTestTable(String tableName) { + return tableName.startsWith("perf_test_") + || tableName.startsWith("bulk_test_") + || tableName.startsWith("test_concurrent_"); + } + + private boolean isTestSchema(String schemaName) { + return schemaName.startsWith("perf_test_") + || schemaName.startsWith("bulk_test_") + || schemaName.startsWith("bigquery_perf_"); + } + + private void createMetalake() { + try { + GravitinoMetalake existingMetalake = client.loadMetalake(metalakeName); + if (existingMetalake != null) { + client.disableMetalake(metalakeName); + client.dropMetalake(metalakeName); + LOG.info("Deleted existing metalake: {}", metalakeName); + } + } catch (Exception e) { + // Metalake doesn't exist, continue with creation + } + + client.createMetalake(metalakeName, "Performance test metalake", Collections.emptyMap()); + GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName); + Assertions.assertEquals(metalakeName, loadMetalake.name()); + + metalake = loadMetalake; + LOG.info("Created metalake: {}", metalakeName); + } + + private Catalog createCatalog(String catalogName) { + Map<String, String> catalogProperties = Maps.newHashMap(); + + // Required properties + catalogProperties.put(PROJECT_ID, projectId); + catalogProperties.put(JdbcConfig.PASSWORD.getKey(), keyPath); // Key file path + catalogProperties.put( + JdbcConfig.JDBC_DRIVER.getKey(), "com.simba.googlebigquery.jdbc42.Driver"); + + // Optional properties + if (StringUtils.isNotBlank(serviceAccountEmail)) { + catalogProperties.put(JdbcConfig.USERNAME.getKey(), serviceAccountEmail); + } + + catalogProperties.put(JdbcConfig.POOL_MIN_SIZE.getKey(), "2"); + catalogProperties.put(JdbcConfig.POOL_MAX_SIZE.getKey(), "10"); + + Catalog createdCatalog = + metalake.createCatalog( + catalogName, + Catalog.Type.RELATIONAL, + provider, + "Performance test catalog", + catalogProperties); + Catalog loadCatalog = metalake.loadCatalog(catalogName); + Assertions.assertEquals(createdCatalog, loadCatalog); + + LOG.info("Created catalog: {}", catalogName); + return loadCatalog; + } + + private void createSchema(Catalog catalog, String schemaName) { + Map<String, String> prop = Maps.newHashMap(); + prop.put("location", "US"); + prop.put("description", "Performance test schema"); + + Schema createdSchema = catalog.asSchemas().createSchema(schemaName, "Performance test", prop); + Schema loadSchema = catalog.asSchemas().loadSchema(schemaName); + Assertions.assertEquals(createdSchema.name(), loadSchema.name()); + + LOG.info("Created schema: {}", schemaName); + } + + private Column[] createTestColumns() { + return new Column[] { + Column.of("id", Types.LongType.get(), "ID column"), + Column.of("name", Types.StringType.get(), "Name column"), + Column.of("value", Types.DoubleType.get(), "Value column"), + Column.of("created_at", Types.TimestampType.withoutTimeZone(), "Creation timestamp"), + Column.of("is_active", Types.BooleanType.get(), "Active flag") + }; + } + + @Test + void testBulkTableCreationPerformance() { + LOG.info("Testing bulk table creation performance with {} tables", BULK_OPERATION_COUNT); + + TableCatalog tableCatalog = catalog.asTableCatalog(); + long startTime = System.currentTimeMillis(); + + // Create multiple tables + for (int i = 0; i < BULK_OPERATION_COUNT; i++) { + String tableName = String.format("bulk_test_table_%03d", i); + NameIdentifier tableId = NameIdentifier.of(schemaName, tableName); + + try { + tableCatalog.createTable( + tableId, + createTestColumns(), + "Bulk test table " + i, + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + + if ((i + 1) % 5 == 0) { + LOG.info("Created {} tables", i + 1); + } + } catch (Exception e) { + LOG.warn("Failed to create table {}: {}", tableName, e.getMessage()); + } + } + + long endTime = System.currentTimeMillis(); + long totalTime = endTime - startTime; + double avgTime = totalTime > 0 ? (double) totalTime / BULK_OPERATION_COUNT : 0; + + LOG.info( + "Bulk table creation completed: attempted {} tables in {}ms (avg: {:.2f}ms per table)", + BULK_OPERATION_COUNT, + totalTime, + avgTime); + + // Verify tables were created + try { + NameIdentifier[] tables = tableCatalog.listTables(Namespace.of(schemaName)); + long bulkTestTables = + java.util.Arrays.stream(tables) + .filter(table -> table.name().startsWith("bulk_test_table_")) + .count(); + + LOG.info("Successfully created {} bulk test tables", bulkTestTables); + + LOG.info( + "Performance metrics - Average creation time: {:.2f}ms, Total time: {}ms", + avgTime, + totalTime); + + if (totalTime > TOTAL_OPERATION_THRESHOLD_MS) { + LOG.warn( + "Total bulk creation time {}ms exceeds threshold {}ms", + totalTime, + TOTAL_OPERATION_THRESHOLD_MS); + } + } catch (Exception e) { + LOG.warn("Failed to verify created tables: {}", e.getMessage()); + } + } + + @Test + void testBulkTableLoadingPerformance() { + LOG.info("Testing bulk table loading performance"); + + // First create some tables + TableCatalog tableCatalog = catalog.asTableCatalog(); + int tableCount = Math.min(BULK_OPERATION_COUNT, 10); + + cleanupTestTablesInSchema(schemaName); + + for (int i = 0; i < tableCount; i++) { + String tableName = String.format("perf_test_load_table_%03d", i); + NameIdentifier tableId = NameIdentifier.of(schemaName, tableName); + + try { + tableCatalog.createTable( + tableId, + createTestColumns(), + "Performance load test table " + i, + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + } catch (Exception e) { + LOG.warn("Failed to create table {} for loading test: {}", tableName, e.getMessage()); + } + } + + LOG.info("Created tables for loading test"); + + // Wait for BigQuery operations to complete + await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(500)) + .until(() -> true); // BigQuery operations are eventually consistent + + // Now test loading performance + long startTime = System.currentTimeMillis(); + int successfulLoads = 0; + + for (int i = 0; i < tableCount; i++) { + String tableName = String.format("perf_test_load_table_%03d", i); + NameIdentifier tableId = NameIdentifier.of(schemaName, tableName); + + try { + Table loadedTable = tableCatalog.loadTable(tableId); + Assertions.assertEquals(tableName, loadedTable.name()); + successfulLoads++; + } catch (Exception e) { + LOG.warn("Failed to load table {}: {}", tableName, e.getMessage()); + } + } + + long endTime = System.currentTimeMillis(); + long totalTime = endTime - startTime; + double avgTime = successfulLoads > 0 ? (double) totalTime / successfulLoads : 0; + + LOG.info( + "Bulk table loading completed: {} successful loads out of {} attempts in {}ms (avg: {:.2f}ms per table)", + successfulLoads, + tableCount, + totalTime, + avgTime); + + if (successfulLoads > 0) { + LOG.info( + "Performance metrics - Average loading time: {:.2f}ms, Total time: {}ms", + avgTime, + totalTime); + + if (avgTime > TABLE_LOADING_THRESHOLD_MS) { + LOG.warn( + "Average table loading time {:.2f}ms exceeds typical threshold {}ms", + avgTime, + TABLE_LOADING_THRESHOLD_MS); + } + } + + Assertions.assertTrue(successfulLoads > 0, "Should successfully load at least one table"); + } + + @Test + void testConcurrentTableOperationsPerformance() throws InterruptedException { + LOG.info( + "Testing concurrent table operations performance with {} threads", CONCURRENT_THREAD_COUNT); + + ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_THREAD_COUNT); + CountDownLatch latch = new CountDownLatch(CONCURRENT_THREAD_COUNT); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger errorCount = new AtomicInteger(0); + AtomicLong totalTime = new AtomicLong(0); + + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < CONCURRENT_THREAD_COUNT; i++) { + final int threadId = i; + executor.submit( + () -> { + try { + long threadStartTime = System.currentTimeMillis(); + + String tableName = String.format("perf_test_concurrent_%03d", threadId); + NameIdentifier tableId = NameIdentifier.of(schemaName, tableName); + + try { + catalog.asTableCatalog().loadTable(tableId); + catalog.asTableCatalog().dropTable(tableId); + } catch (Exception e) { + // Table doesn't exist, that's expected + } + + // Create table + catalog + .asTableCatalog() + .createTable( + tableId, + createTestColumns(), + "Concurrent test table " + threadId, + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + + // Load table + Table loadedTable = catalog.asTableCatalog().loadTable(tableId); + Assertions.assertEquals(tableName, loadedTable.name()); + + long threadEndTime = System.currentTimeMillis(); + totalTime.addAndGet(threadEndTime - threadStartTime); + successCount.incrementAndGet(); + + } catch (Exception e) { + LOG.error("Concurrent operation failed in thread {}: {}", threadId, e.getMessage()); + errorCount.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + + // Wait for all threads to complete + boolean completed = latch.await(10, TimeUnit.MINUTES); + executor.shutdown(); + + long endTime = System.currentTimeMillis(); + long wallClockTime = endTime - startTime; + + LOG.info( + "Concurrent operations completed: {} successful, {} errors in {}ms wall-clock time", + successCount.get(), + errorCount.get(), + wallClockTime); + + if (CONCURRENT_THREAD_COUNT > 0) { + double avgThreadTime = totalTime.get() / (double) CONCURRENT_THREAD_COUNT; + LOG.info("Average thread execution time: {:.2f}ms", avgThreadTime); + } + + Assertions.assertTrue(completed, "All concurrent operations should complete within timeout"); + Assertions.assertTrue( + successCount.get() > 0, "At least some concurrent operations should succeed"); + } + + @Test + void testLargeScaleSchemaListingPerformance() { + LOG.info("Testing large-scale schema listing performance"); + + // Create multiple schemas + int schemaCount = Math.min(LARGE_SCALE_COUNT, 10); + int createdCount = 0; + + for (int i = 0; i < schemaCount; i++) { + String testSchemaName = String.format("perf_test_schema_%03d", i); + + try { + catalog.asSchemas().loadSchema(testSchemaName); + continue; + } catch (Exception e) { + // Schema doesn't exist, continue to create + } + + Map<String, String> properties = Maps.newHashMap(); + properties.put("location", "US"); + properties.put("description", "Performance test schema " + i); + + try { + catalog.asSchemas().createSchema(testSchemaName, "Perf test schema", properties); + createdCount++; + LOG.info("Created schema {}", testSchemaName); + } catch (Exception e) { + LOG.warn("Failed to create schema {}: {}", testSchemaName, e.getMessage()); + } + } + + LOG.info("Created {} schemas for listing test", createdCount); + + // Wait for BigQuery operations to complete + await() + .atMost(Duration.ofSeconds(15)) + .pollInterval(Duration.ofMillis(500)) + .until(() -> true); // BigQuery operations are eventually consistent + + // Test listing performance multiple times + int iterations = 3; + long totalTime = 0; + + for (int i = 0; i < iterations; i++) { + long startTime = System.currentTimeMillis(); + String[] schemas = catalog.asSchemas().listSchemas(); + long endTime = System.currentTimeMillis(); + + totalTime += (endTime - startTime); + + if (i == 0) { + LOG.info("First schema listing returned {} schemas", schemas.length); + } + } + + double avgTime = (double) totalTime / iterations; + LOG.info("Schema listing performance: {} iterations, avg time: {:.2f}ms", iterations, avgTime); + + if (avgTime > LISTING_THRESHOLD_MS) { + LOG.warn( + "Average schema listing time {:.2f}ms exceeds typical threshold {}ms", + avgTime, + LISTING_THRESHOLD_MS); + } + } + + @Test + void testLargeScaleTableListingPerformance() { + LOG.info("Testing large-scale table listing performance"); + + // Create multiple tables + TableCatalog tableCatalog = catalog.asTableCatalog(); + int tableCount = Math.min(LARGE_SCALE_COUNT, 20); + + cleanupTestTablesInSchema(schemaName); + + long creationStartTime = System.currentTimeMillis(); + int createdCount = 0; + + for (int i = 0; i < tableCount; i++) { + String tableName = String.format("perf_test_list_table_%03d", i); + NameIdentifier tableId = NameIdentifier.of(schemaName, tableName); + + try { + tableCatalog.createTable( + tableId, + createTestColumns(), + "Performance list test table " + i, + Collections.emptyMap(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + createdCount++; + + if ((i + 1) % 5 == 0) { + LOG.info("Created {} tables for listing test", i + 1); + } + } catch (Exception e) { + LOG.warn("Failed to create table {}: {}", tableName, e.getMessage()); + } + } + + long creationEndTime = System.currentTimeMillis(); + LOG.info( + "Created {} tables for listing test in {}ms", + createdCount, + creationEndTime - creationStartTime); + + // Wait for BigQuery operations to complete + await() + .atMost(Duration.ofSeconds(15)) + .pollInterval(Duration.ofMillis(500)) + .until(() -> true); // BigQuery operations are eventually consistent + + // Test listing performance multiple times + int iterations = 3; + long totalTime = 0; + + for (int i = 0; i < iterations; i++) { + long startTime = System.currentTimeMillis(); + NameIdentifier[] tables = tableCatalog.listTables(Namespace.of(schemaName)); + long endTime = System.currentTimeMillis(); + + totalTime += (endTime - startTime); + + if (i == 0) { + LOG.info("First table listing returned {} tables", tables.length); + } + } + + double avgTime = (double) totalTime / iterations; + LOG.info("Table listing performance: {} iterations, avg time: {:.2f}ms", iterations, avgTime); + + if (avgTime > LISTING_THRESHOLD_MS) { + LOG.warn( + "Average table listing time {:.2f}ms exceeds typical threshold {}ms", + avgTime, + LISTING_THRESHOLD_MS); + } + } + + @Test + void testConnectionPoolingEfficiency() { + LOG.info("Testing connection pooling efficiency"); + + // Perform rapid sequential operations to test connection reuse + TableCatalog tableCatalog = catalog.asTableCatalog(); + int operationCount = 10; + + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < operationCount; i++) { + // Mix of different operations to test connection pooling + if (i % 3 == 0) { + // Schema listing + try { + catalog.asSchemas().listSchemas(); + } catch (Exception e) { + LOG.warn("Schema listing operation failed: {}", e.getMessage()); + } + } else if (i % 3 == 1) { + // Table listing + try { + tableCatalog.listTables(Namespace.of(schemaName)); + } catch (Exception e) { + LOG.warn("Table listing operation failed: {}", e.getMessage()); + } + } else { + // Schema loading + try { + catalog.asSchemas().loadSchema(schemaName); + } catch (Exception e) { + LOG.warn("Schema loading operation failed: {}", e.getMessage()); + } + } + } + + long endTime = System.currentTimeMillis(); + long totalTime = endTime - startTime; + double avgTime = (double) totalTime / operationCount; + + LOG.info( + "Connection pooling test: {} operations in {}ms (avg: {:.2f}ms per operation)", + operationCount, + totalTime, + avgTime); + + LOG.info("Connection pooling performance - Total: {}ms, Average: {:.2f}ms", totalTime, avgTime); + } +} diff --git a/catalogs/catalog-jdbc-bigquery/src/test/resources/log4j2.properties b/catalogs/catalog-jdbc-bigquery/src/test/resources/log4j2.properties new file mode 100644 index 0000000000..82dc473e7e --- /dev/null +++ b/catalogs/catalog-jdbc-bigquery/src/test/resources/log4j2.properties @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Log4j2 configuration for BigQuery catalog tests + +# Root logger level +rootLogger.level = INFO +rootLogger.appenderRefs = stdout +rootLogger.appenderRef.stdout.ref = STDOUT + +# Console appender +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} - %msg%n + +# BigQuery catalog specific logging +logger.bigquery.name = org.apache.gravitino.catalog.bigquery +logger.bigquery.level = DEBUG +logger.bigquery.additivity = false +logger.bigquery.appenderRefs = stdout +logger.bigquery.appenderRef.stdout.ref = STDOUT + +# Integration test logging +logger.integration.name = org.apache.gravitino.catalog.bigquery.integration.test +logger.integration.level = INFO +logger.integration.additivity = false +logger.integration.appenderRefs = stdout +logger.integration.appenderRef.stdout.ref = STDOUT + +# Google API client logging (reduce noise) +logger.google.name = com.google +logger.google.level = WARN + +# HTTP transport logging (reduce noise) +logger.http.name = com.google.api.client.http +logger.http.level = WARN + +# BigQuery API logging +logger.bigqueryapi.name = com.google.api.services.bigquery +logger.bigqueryapi.level = INFO \ No newline at end of file diff --git a/docs/jdbc-bigquery-catalog.md b/docs/jdbc-bigquery-catalog.md new file mode 100644 index 0000000000..c2553b5234 --- /dev/null +++ b/docs/jdbc-bigquery-catalog.md @@ -0,0 +1,317 @@ +--- +title: "BigQuery catalog" +slug: /jdbc-bigquery-catalog +keywords: +- jdbc +- BigQuery +- metadata +license: "This software is licensed under the Apache License version 2." +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +## Introduction + +Apache Gravitino provides the ability to manage Google BigQuery metadata through JDBC connection. + +:::caution +Gravitino saves some system information in schema and table comment, like `(From Gravitino, DO NOT EDIT: gravitino.v1.uid1078334182909406185)`, please don't change or remove this message. +::: + +## Catalog + +### Catalog capabilities + +- Gravitino catalog corresponds to the BigQuery project. +- Supports metadata management of Google BigQuery. +- Supports DDL operations for BigQuery datasets and tables. +- Supports table partitioning and clustering. +- Supports [column default value](./manage-relational-metadata-using-gravitino.md#table-column-default-value). + +### Catalog properties + +You can pass to a BigQuery data source any property that isn't defined by Gravitino by adding `gravitino.bypass.` prefix as a catalog property. For example, catalog property `gravitino.bypass.maxWaitMillis` will pass `maxWaitMillis` to the data source property. + +Check the relevant data source configuration in [data source properties](https://commons.apache.org/proper/commons-dbcp/configuration.html) + +If you use a JDBC catalog, you must provide `jdbc-url`, `jdbc-driver`, `jdbc-user` and `jdbc-password` to catalog properties. +Besides the [common catalog properties](./gravitino-server-config.md#apache-gravitino-catalog-properties-configuration), the BigQuery catalog has the following properties: + +| Configuration item | Description | Default value | Required | Since version | +|-------------------------|---------------------------------------------------------------------------------------------------|--------------------------------------------------------------|----------|---------------| +| `project-id` | Google Cloud Project ID | (none) | Yes | 1.2.0 | +| `jdbc-url` | JDBC connection URL, can be auto-generated or manually specified | `jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443` | Yes | 1.2.0 | +| `jdbc-driver` | JDBC driver class name | `com.simba.googlebigquery.jdbc42.Driver` | Yes | 1.2.0 | +| `jdbc-user` | Service account email | (none) | Yes | 1.2.0 | +| `jdbc-password` | Service account key file path | (none) | Yes | 1.2.0 | +| `jdbc.pool.min-size` | The minimum number of connections in the pool. `2` by default. | `2` | No | 1.2.0 | +| `jdbc.pool.max-size` | The maximum number of connections in the pool. `10` by default. | `10` | No | 1.2.0 | +| `jdbc.pool.max-wait-ms` | The maximum Duration that the pool will wait for a connection to be returned. `30000` by default. | `30000` | No | 1.2.0 | +| `proxy-host` | Proxy server hostname or IP address | (none) | No | 1.2.0 | +| `proxy-port` | Proxy server port number | (none) | No | 1.2.0 | +| `proxy-username` | Proxy authentication username | (none) | No | 1.2.0 | +| `proxy-password` | Proxy authentication password | (none) | No | 1.2.0 | + +:::note +When using a proxy that requires credentials (`proxy-username` and `proxy-password`), the following JVM arguments must be used: +``` +-Djdk.http.auth.tunneling.disabledSchemes= +-Djdk.http.auth.proxying.disabledSchemes= +``` +These arguments disable the default security restrictions on HTTP authentication schemes for tunneling and proxying, allowing authentication to work properly through the proxy. +::: + + + +### Catalog operations + +Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) for more details. + +## Schema + +### Schema capabilities + +- Gravitino's schema concept corresponds to the BigQuery dataset. +- Supports creating schema with properties like location, description, labels, etc. +- Supports dropping schema. +- Supports cascade dropping schema. + +### Schema properties + +| Property Name | Type | Description | Supported | +|--------------------------------------|-------------------------------|-------------------------------------------------------------------------|-----------| +| `default_kms_key_name` | STRING | Default Cloud KMS encryption key (not verified) | Yes | +| `default_partition_expiration_days` | FLOAT64 | Default partition expiration days | Yes | +| `default_rounding_mode` | STRING | Default rounding mode: `ROUND_HALF_AWAY_FROM_ZERO` or `ROUND_HALF_EVEN` | Yes | +| `default_table_expiration_days` | FLOAT64 | Default table expiration days | Yes | +| `failover_reservation` | STRING | Failover reservation | No | +| `friendly_name` | STRING | Friendly name | Yes | +| `is_case_insensitive` | BOOL | Case insensitive | Yes | +| `is_primary` | BOOLEAN | Whether it's the primary replica | No | +| `primary_replica` | STRING | Primary replica name | No | +| `storage_billing_model` | STRING | Storage billing model: `PHYSICAL` or `LOGICAL` | Yes | +| `tags` | ARRAY<STRUCT<STRING, STRING>> | IAM tags array | Yes | +| `collate` | STRING | Collation schema | No | + +### Schema operations + +Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#schema-operations) for more details. + +## Table + +### Table capabilities + +- Gravitino's table concept corresponds to the BigQuery table. +- Supports DDL operations for BigQuery tables. +- Supports table partitioning and clustering. +- Supports [column default value](./manage-relational-metadata-using-gravitino.md#table-column-default-value). + +### Table column types + +**Basic Types Supported by Web UI**: + +| Gravitino Type | BigQuery Type | +|----------------|---------------| +| `Binary` | `BYTES` | +| `Boolean` | `BOOL` | +| `Byte` | `INT64` | +| `Char` | `STRING` | +| `Date` | `DATE` | +| `Decimal` | `NUMERIC` | +| `Double` | `FLOAT64` | +| `Float` | `FLOAT64` | +| `Integer` | `INT64` | +| `Long` | `INT64` | +| `Short` | `INT64` | +| `String` | `STRING` | +| `Time` | `TIME` | +| `Timestamp` | `DATETIME` | +| `Timestamp_tz` | `TIMESTAMP` | +| `VarChar` | `STRING` | + +**Complex Types Supported by API** (Not supported in Web UI): + +| Gravitino Type| BigQuery Type | +|---------------|---------------| +| `ARRAY<T>` | `ARRAY<T>` | +| `STRUCT<...>` | `STRUCT<...>` | +| `Geography` | `GEOGRAPHY` | +| `Json` | `JSON` | +| `Range<T>` | `RANGE<T>` | + +:::info +BigQuery doesn't support Gravitino `Fixed` `IntervalDay` `IntervalYear` `Union` `UUID` type. +Meanwhile, the data types other than listed above are mapped to Gravitino **[External Type](./manage-relational-metadata-using-gravitino.md#external-type)** that represents an unresolvable data type. + +Unsupported types will be optimized in future versions. The following types are recommended to use `string` type as a workaround: +- INTERVAL +- BIGNUMERIC (Note: BIGNUMERIC precision above 38 is truncated to fit Gravitino's DecimalType limits) +::: + +### Table properties + +| Property Name | Description | Type | Example Value | Supported | +|---------------------------------|-------------------------------------------|---------|----------------------------------------------------|-----------| +| `description` | Table description | String | `"Customer data table"` | Yes | +| `friendly_name` | Friendly name | String | `"Customer Data"` | Yes | +| `expiration_timestamp` | Table expiration timestamp | String | `"2025-12-31T23:59:59Z"` | Yes | +| `partition_expiration_days` | Partition expiration days | Float64 | `"365.5"` | Yes | +| `require_partition_filter` | Whether partition filter is required | Boolean | `"true"` | Yes | +| `clustering_fields` | Clustering fields, comma-separated, max 4 | String | `"country,city,category"` | Yes | +| `labels` | Table labels, JSON array format | String | `"[{\"env\":\"prod\"}, {\"team\":\"data\"}]"` | Yes | +| `kms_key_name` | Cloud KMS encryption key | String | `"projects/p/locations/l/keyRings/r/cryptoKeys/k"` | Yes | +| `default_rounding_mode` | Default rounding mode | String | `"ROUND_HALF_EVEN"` | Yes | +| `enable_change_history` | Enable change history | Boolean | `"true"` | Yes | +| `enable_fine_grained_mutations` | Enable fine-grained mutations | Boolean | `"true"` | Yes | +| `tags` | IAM tags | String | `"[{\"key\":\"value\"}]"` | Yes | +| `max_staleness` | Maximum staleness | String | `"INTERVAL 4 HOUR"` | No | +| `storage_uri` | Storage URI for managed tables | String | `"gs://my-bucket/tables/"` | No | +| `file_format` | File format for managed tables | String | `"PARQUET"` | No | +| `table_format` | Table format for managed tables | String | `"ICEBERG"` | No | + +### Table partitioning + +The BigQuery catalog supports partitioned tables. Users can create partitioned tables with specific partitioning attributes through the API. + +| Gravitino Partition Transform | BigQuery Partition Strategy | +|---------------------------------|----------------------------------------------------------------------------------------------------| +| `Transforms.day("column")` | Daily partitioning based on DATE/TIMESTAMP/DATETIME column | +| `Transforms.hour("column")` | Hourly partitioning based on TIMESTAMP/DATETIME column | +| `Transforms.month("column")` | Monthly partitioning based on DATE/TIMESTAMP/DATETIME column | +| `Transforms.year("column")` | Yearly partitioning based on DATE/TIMESTAMP/DATETIME column | +| `Transforms.identity("column")` | Direct partitioning (only DATE/TIMESTAMP/DATETIME, integer range partitioning not yet implemented) | + +:::note +Integer range partitioning is not implemented in the current version. +::: + +### Table operations + +Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#table-operations) for more details. + +#### Create table examples + +<Tabs groupId='language' queryString> +<TabItem value="json" label="Json"> + +```json +{ + "name": "user_events", + "comment": "User events table with partitioning and clustering", + "columns": [ + { + "name": "user_id", + "type": "string", + "nullable": false, + "comment": "User ID" + }, + { + "name": "event_timestamp", + "type": "timestamp_tz", + "nullable": false, + "comment": "Event timestamp" + }, + { + "name": "event_type", + "type": "string", + "nullable": true, + "comment": "Event type" + }, + { + "name": "event_data", + "type": "json", + "nullable": true, + "comment": "Event data in JSON format" + } + ], + "partitioning": [ + { + "strategy": "day", + "fieldName": ["event_timestamp"] + } + ], + "properties": { + "clustering_fields": "user_id,event_type", + "partition_expiration_days": "365", + "require_partition_filter": "true" + } +} +``` + +</TabItem> +<TabItem value="java" label="Java"> + +```java +Column[] columns = new Column[] { + Column.of("user_id", Types.StringType.get(), "User ID", false, false, null), + Column.of("event_timestamp", Types.TimestampType.withTimeZone(), "Event timestamp", false, false, null), + Column.of("event_type", Types.StringType.get(), "Event type", true, false, null), + Column.of("event_data", Types.ExternalType.of("JSON"), "Event data in JSON format", true, false, null) +}; + +Transform[] partitioning = new Transform[] { + Transforms.day("event_timestamp") +}; + +Map<String, String> properties = ImmutableMap.of( + "clustering_fields", "user_id,event_type", + "partition_expiration_days", "365", + "require_partition_filter", "true" +); + +Table table = catalog.asTableCatalog() + .createTable( + NameIdentifier.of("schema_name", "user_events"), + columns, + "User events table with partitioning and clustering", + properties, + partitioning + ); +``` + +</TabItem> +</Tabs> + +#### Alter table operations + +Gravitino supports these table alteration operations: + +- `RenameTable` +- `UpdateComment` +- `AddColumn` +- `DeleteColumn` +- `RenameColumn` +- `UpdateColumnType` +- `UpdateColumnPosition` +- `UpdateColumnNullability` +- `UpdateColumnComment` +- `UpdateColumnDefaultValue` +- `SetProperty` + +:::info +- You cannot submit the `RenameTable` operation at the same time as other operations. +- If you update a nullability column to non-nullability, there may be compatibility issues. +::: + +## Limitations and Considerations + +### Current Limitations + +1. **Feature Limitations** + - Integer range partitioning not implemented + - External tables, views, table clones not supported + - Dataset properties like `failover_reservation`, `is_primary`, `primary_replica`, `collate` not supported + - Web UI does not support table partitioning and clustering (API only) + - Web UI does not support complex data types (ARRAY, STRUCT, GEOGRAPHY, JSON) (API only) + - BIGNUMERIC precision above (38,9) has precision loss; INTERVAL type currently not supported + - Unsupported data types display as "external" in Web UI + +2. **Performance Considerations** + - Table metadata loading uses JDBC which may be slower than native API calls + +### Recommended Usage + +- **Simple Scenarios**: Use Web UI to create basic tables and schemas +- **Advanced Scenarios**: Use API for complex configurations (supports BigQuery native data types for table creation) +- **Production Environment**: Recommend using API for batch operations \ No newline at end of file
