Copilot commented on code in PR #10560: URL: https://github.com/apache/gravitino/pull/10560#discussion_r3014030746
########## flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoSessionCatalogStore.java: ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.store; + +import static org.apache.gravitino.flink.connector.utils.FactoryUtils.isGravitinoManagedCatalogType; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import java.util.Optional; +import java.util.Set; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestGravitinoSessionCatalogStore { + + /** + * A Gravitino-managed (built-in) catalog type — verified by {@link + * org.apache.gravitino.flink.connector.utils.FactoryUtils#isGravitinoManagedCatalogType}. + * Catalogs of this type are routed to the Gravitino-backed store. + */ + private static final String GRAVITINO_CATALOG_TYPE = "gravitino-hive"; + + /** + * A third-party (session) catalog type that Gravitino does not manage. Catalogs of this type are + * routed to the in-memory store and exist only for the lifetime of the Flink session. + */ + private static final String SESSION_CATALOG_TYPE = "hive"; + + private GravitinoCatalogStore gravitinoCatalogStore; + private GenericInMemoryCatalogStore memoryCatalogStore; + private GravitinoSessionCatalogStore sessionCatalogStore; + + @BeforeEach + void setUp() { + Assertions.assertTrue( + isGravitinoManagedCatalogType(GRAVITINO_CATALOG_TYPE), + "GRAVITINO_CATALOG_TYPE must be recognised by isGravitinoManagedCatalogType()"); + Assertions.assertFalse( + isGravitinoManagedCatalogType(SESSION_CATALOG_TYPE), + "SESSION_CATALOG_TYPE must NOT be recognised by isGravitinoManagedCatalogType()"); + gravitinoCatalogStore = mock(GravitinoCatalogStore.class); + memoryCatalogStore = mock(GenericInMemoryCatalogStore.class); + sessionCatalogStore = + new GravitinoSessionCatalogStore(gravitinoCatalogStore, memoryCatalogStore); + } + + // ------------------------------------------------------------------------- + // storeCatalog + // ------------------------------------------------------------------------- + + @Test + void testStoreCatalog_gravitinoCatalog_storesInGravitino() throws CatalogException { + CatalogDescriptor descriptor = descriptorWithType(GRAVITINO_CATALOG_TYPE); + + sessionCatalogStore.storeCatalog("gravitino-hive", descriptor); + + verify(gravitinoCatalogStore).storeCatalog("gravitino-hive", descriptor); + verify(memoryCatalogStore, never()).storeCatalog("gravitino-hive", descriptor); + } + + @Test + void testStoreCatalog_sessionCatalog_storesInMemory() throws CatalogException { + CatalogDescriptor descriptor = descriptorWithType(SESSION_CATALOG_TYPE); + + sessionCatalogStore.storeCatalog("hive", descriptor); + + verify(memoryCatalogStore).storeCatalog("hive", descriptor); + verify(gravitinoCatalogStore, never()).storeCatalog("hive", descriptor); + } + + @Test + void testStoreCatalog_missingCatalogType_throwsCatalogException() { + CatalogDescriptor descriptor = CatalogDescriptor.of("unknown", new Configuration()); + + Assertions.assertThrows( + CatalogException.class, () -> sessionCatalogStore.storeCatalog("unknown", descriptor)); + } + + // ------------------------------------------------------------------------- + // removeCatalog + // ------------------------------------------------------------------------- + + @Test + void testRemoveCatalog_catalogInMemory_removesFromMemory() throws CatalogException { + when(memoryCatalogStore.contains("hive")).thenReturn(true); + + sessionCatalogStore.removeCatalog("hive", false); + + verify(memoryCatalogStore).removeCatalog("hive", false); + verify(gravitinoCatalogStore, never()).removeCatalog("hive", false); + } + + @Test + void testRemoveCatalog_catalogNotInMemory_removesFromGravitino() throws CatalogException { + when(memoryCatalogStore.contains("gravitino-hive")).thenReturn(false); + + sessionCatalogStore.removeCatalog("gravitino-hive", false); + + verify(gravitinoCatalogStore).removeCatalog("gravitino-hive", false); + verify(memoryCatalogStore, never()).removeCatalog("gravitino-hive", false); + } + + // ------------------------------------------------------------------------- + // getCatalog + // ------------------------------------------------------------------------- + + @Test + void testGetCatalog_catalogInMemory_returnsFromMemory() throws CatalogException { + CatalogDescriptor expected = descriptorWithType(SESSION_CATALOG_TYPE); + when(memoryCatalogStore.getCatalog("hive")).thenReturn(Optional.of(expected)); + + Optional<CatalogDescriptor> result = sessionCatalogStore.getCatalog("hive"); + + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(expected, result.get()); + verify(gravitinoCatalogStore, never()).getCatalog("hive"); + } + + @Test + void testGetCatalog_catalogNotInMemory_returnsFromGravitino() throws CatalogException { + CatalogDescriptor expected = descriptorWithType(GRAVITINO_CATALOG_TYPE); + when(memoryCatalogStore.getCatalog("gravitino-hive")).thenReturn(Optional.empty()); + when(gravitinoCatalogStore.getCatalog("gravitino-hive")).thenReturn(Optional.of(expected)); + + Optional<CatalogDescriptor> result = sessionCatalogStore.getCatalog("gravitino-hive"); + + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(expected, result.get()); + verify(gravitinoCatalogStore).getCatalog("gravitino-hive"); + } + + @Test + void testGetCatalog_catalogInMemoryButEmpty_fallbackToGravitino() throws CatalogException { + CatalogDescriptor expected = descriptorWithType(GRAVITINO_CATALOG_TYPE); + when(memoryCatalogStore.getCatalog("gravitino-hive")).thenReturn(Optional.empty()); + when(gravitinoCatalogStore.getCatalog("gravitino-hive")).thenReturn(Optional.of(expected)); + + Optional<CatalogDescriptor> result = sessionCatalogStore.getCatalog("gravitino-hive"); + + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(expected, result.get()); + verify(gravitinoCatalogStore).getCatalog("gravitino-hive"); + } + Review Comment: `testGetCatalog_catalogInMemoryButEmpty_fallbackToGravitino` appears to duplicate `testGetCatalog_catalogNotInMemory_returnsFromGravitino` (both stub `memoryCatalogStore.getCatalog(...)` to return `Optional.empty()` and assert fallback to Gravitino). Consider removing one or changing it to cover a distinct case to keep the test suite focused. ```suggestion ``` ########## flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/session/FlinkSupportsSessionCatalogIT.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.integration.test.session; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Optional; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; +import org.apache.gravitino.catalog.hive.HiveConstants; +import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions; +import org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT; +import org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests verifying that the {@code gravitino.supportsSessionCatalog} option correctly + * routes catalog operations to the appropriate store: + * + * <ul> + * <li>Gravitino-managed catalog types (e.g. {@code gravitino-hive}) are persisted to the + * Gravitino server via {@link + * org.apache.gravitino.flink.connector.store.GravitinoCatalogStore}. + * <li>Session-scoped catalog types (e.g. {@code generic_in_memory}) are held in memory only and + * never reach the Gravitino server. + * <li>Both stores contribute to the catalog list returned by {@code SHOW CATALOGS}. + * </ul> + * + * <p>Requires a running Hive Docker container for the Gravitino-managed catalog tests. + */ +@Tag("gravitino-docker-test") +public class FlinkSupportsSessionCatalogIT extends FlinkEnvIT { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkSupportsSessionCatalogIT.class); + + private static String hiveConfDir; + + @Override + protected String getProvider() { + return "hive"; + } + + /** + * Overrides the default Flink environment to enable {@code + * gravitino.supportsSessionCatalog=true}, which wires a {@link + * org.apache.gravitino.flink.connector.store.GravitinoSessionCatalogStore} as the catalog store. + */ + @Override + protected void initFlinkEnv() { + initHiveConfDir(); + final Configuration configuration = new Configuration(); + configuration.setString( + "table.catalog-store.kind", GravitinoCatalogStoreFactoryOptions.GRAVITINO); + configuration.setString("table.catalog-store.gravitino.gravitino.metalake", GRAVITINO_METALAKE); + configuration.setString("table.catalog-store.gravitino.gravitino.uri", gravitinoUri); + configuration.setBoolean( + "table.catalog-store.gravitino.gravitino.supportsSessionCatalog", true); + EnvironmentSettings settings = + EnvironmentSettings.newInstance().withConfiguration(configuration).inBatchMode().build(); + tableEnv = TableEnvironment.create(settings); + LOG.info( + "Flink env with supportsSessionCatalog=true initialized, Gravitino uri: {}.", gravitinoUri); + } + + @BeforeAll + void sessionCatalogStartUp() { + Preconditions.checkArgument(metalake != null, "metalake should not be null"); + LOG.info("FlinkSupportsSessionCatalogIT startup complete."); + } + + @AfterAll + static void sessionCatalogStop() { + Preconditions.checkArgument(metalake != null, "metalake should not be null"); + LOG.info("FlinkSupportsSessionCatalogIT teardown complete."); + } + + /** + * A Gravitino-managed catalog type ({@code gravitino-hive}) created via {@link + * TableEnvironment#createCatalog} must be persisted to the Gravitino server. + */ + @Test + public void testCreateGravitinoHiveCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String catalogName = "session_it_gravitino_hive"; + + Configuration conf = new Configuration(); + conf.set(CommonCatalogOptions.CATALOG_TYPE, GravitinoHiveCatalogFactoryOptions.IDENTIFIER); + conf.set(HiveCatalogFactoryOptions.HIVE_CONF_DIR, hiveConfDir); + conf.set(GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS, hiveMetastoreUri); + CatalogDescriptor descriptor = CatalogDescriptor.of(catalogName, conf); + tableEnv.createCatalog(catalogName, descriptor); + + try { + Assertions.assertTrue( + metalake.catalogExists(catalogName), + "Gravitino-managed catalog must be persisted to the Gravitino server"); + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should contain the created catalog"); + org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Assertions.assertEquals( + hiveMetastoreUri, gravitinoCatalog.properties().get(HiveConstants.METASTORE_URIS)); + } finally { + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("DROP CATALOG " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + } + } + + /** + * A Gravitino-managed catalog type ({@code gravitino-hive}) created via Flink SQL {@code CREATE + * CATALOG} must be persisted to the Gravitino server. + */ + @Test + public void testCreateGravitinoHiveCatalogUsingSQL() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String catalogName = "session_it_gravitino_hive_sql"; + + tableEnv.executeSql( + String.format( + "CREATE CATALOG %s WITH (" + + "'type'='%s'," + + "'hive-conf-dir'='%s'," + + "'hive.metastore.uris'='%s'" + + ")", + catalogName, + GravitinoHiveCatalogFactoryOptions.IDENTIFIER, + hiveConfDir, + hiveMetastoreUri)); + + try { + Assertions.assertTrue( + metalake.catalogExists(catalogName), + "Gravitino-managed catalog must be persisted to the Gravitino server"); + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should contain the created catalog"); + } finally { + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("DROP CATALOG " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + } + } + + /** + * A session-scoped catalog type ({@code generic_in_memory}) created via Flink SQL must be + * accessible in Flink but must NOT be persisted to the Gravitino server. + */ + @Test + public void testCreateSessionScopedCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String catalogName = "session_it_memory_catalog"; + + tableEnv.executeSql( + String.format("CREATE CATALOG %s WITH ('type'='generic_in_memory')", catalogName)); + + try { + Assertions.assertFalse( + metalake.catalogExists(catalogName), + "Session-scoped catalog must NOT be persisted to the Gravitino server"); + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should contain the created catalog"); + } finally { + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("DROP CATALOG " + catalogName); + } + } + + /** + * {@code SHOW CATALOGS} must return catalogs from both the Gravitino-backed store and the + * in-memory store when {@code supportsSessionCatalog=true}. + */ + @Test + public void testListCatalogsReturnsBothStores() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String gravitinoCatalogName = "session_it_list_gravitino_hive"; + String sessionCatalogName = "session_it_list_memory_catalog"; + + tableEnv.executeSql( + String.format( + "CREATE CATALOG %s WITH (" + + "'type'='%s'," + + "'hive-conf-dir'='%s'," + + "'hive.metastore.uris'='%s'" + + ")", + gravitinoCatalogName, + GravitinoHiveCatalogFactoryOptions.IDENTIFIER, + hiveConfDir, + hiveMetastoreUri)); + tableEnv.executeSql( + String.format("CREATE CATALOG %s WITH ('type'='generic_in_memory')", sessionCatalogName)); + + try { + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 2, catalogs.length, "Should have two more catalogs"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(gravitinoCatalogName), + "Gravitino-managed catalog must appear in SHOW CATALOGS"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(sessionCatalogName), + "Session-scoped catalog must appear in SHOW CATALOGS"); + } finally { + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("DROP CATALOG " + gravitinoCatalogName); + tableEnv.executeSql("DROP CATALOG " + sessionCatalogName); + Assertions.assertFalse(metalake.catalogExists(gravitinoCatalogName)); + } + } + + /** + * Dropping a Gravitino-managed catalog via Flink SQL must remove it from the Gravitino server and + * from the Flink catalog list. + */ + @Test + public void testDropGravitinoHiveCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + String catalogName = "session_it_drop_gravitino_hive"; + + tableEnv.executeSql( + String.format( + "CREATE CATALOG %s WITH (" + + "'type'='%s'," + + "'hive-conf-dir'='%s'," + + "'hive.metastore.uris'='%s'" + + ")", + catalogName, + GravitinoHiveCatalogFactoryOptions.IDENTIFIER, + hiveConfDir, + hiveMetastoreUri)); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + tableEnv.executeSql("DROP CATALOG " + catalogName); + + Assertions.assertFalse( + metalake.catalogExists(catalogName), + "Dropped Gravitino-managed catalog must be removed from the Gravitino server"); + Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName); + Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); + } + + /** + * Dropping a session-scoped catalog via Flink SQL must remove it from the in-memory store without + * error, since it was never registered in Gravitino. + */ + @Test + public void testDropSessionScopedCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + String catalogName = "session_it_drop_memory_catalog"; + + tableEnv.executeSql( + String.format("CREATE CATALOG %s WITH ('type'='generic_in_memory')", catalogName)); + Assertions.assertFalse( + metalake.catalogExists(catalogName), + "Session-scoped catalog must NOT be in Gravitino before drop"); + + tableEnv.executeSql("DROP CATALOG " + catalogName); + + Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName); + Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); + } + + private static void initHiveConfDir() { + if (hiveConfDir != null) { + return; + } + try { + java.nio.file.Path dir = java.nio.file.Files.createTempDirectory("flink-session-hive-conf"); + java.nio.file.Path hiveSite = dir.resolve("hive-site.xml"); + String hiveSiteXml = + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" Review Comment: `initHiveConfDir()` creates a temporary directory via `Files.createTempDirectory(...)` but never deletes it. Since this runs in integration tests, consider marking the directory for cleanup (e.g., `deleteOnExit` / explicit recursive delete in an `@AfterAll`) to avoid accumulating temp directories across repeated test runs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
