Copilot commented on code in PR #10560: URL: https://github.com/apache/gravitino/pull/10560#discussion_r3007794746
########## flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoSessionCatalogStore.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.store; + +import static org.apache.gravitino.flink.connector.utils.FactoryUtils.GRAVITINO_FACTORY_LIST; +import static org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import java.util.Optional; +import java.util.Set; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.junit.Before; +import org.junit.Test; + +public class TestGravitinoSessionCatalogStore { + + /** A catalog type that isBuiltInCatalog() recognises as Gravitino-managed. */ + private static final String BUILT_IN_TYPE = GRAVITINO_FACTORY_LIST.get(0); + + /** A catalog type that isBuiltInCatalog() does NOT recognise. */ + private static final String OTHER_TYPE = "hive"; + + private GravitinoCatalogStore gravitinoCatalogStore; + private GenericInMemoryCatalogStore memoryCatalogStore; + private GravitinoSessionCatalogStore sessionCatalogStore; + + @Before + public void setUp() { + assertTrue( + "BUILT_IN_TYPE must be recognised by isBuiltInCatalog()", isBuiltInCatalog(BUILT_IN_TYPE)); + assertFalse( + "OTHER_TYPE must NOT be recognised by isBuiltInCatalog()", isBuiltInCatalog(OTHER_TYPE)); + gravitinoCatalogStore = mock(GravitinoCatalogStore.class); + memoryCatalogStore = mock(GenericInMemoryCatalogStore.class); + sessionCatalogStore = + new GravitinoSessionCatalogStore(gravitinoCatalogStore, memoryCatalogStore); + } + + // ------------------------------------------------------------------------- + // storeCatalog + // ------------------------------------------------------------------------- + + @Test + public void testStoreCatalog_builtInType_storesInMemory() throws CatalogException { + String catalogName = "hive_catalog"; + CatalogDescriptor descriptor = descriptorWithType(BUILT_IN_TYPE); + + sessionCatalogStore.storeCatalog(catalogName, descriptor); + + verify(memoryCatalogStore).storeCatalog(catalogName, descriptor); + verify(gravitinoCatalogStore, never()).storeCatalog(catalogName, descriptor); + } + + @Test + public void testStoreCatalog_thirdPartyType_storesInGravitino() throws CatalogException { + String catalogName = "hive"; + CatalogDescriptor descriptor = descriptorWithType(OTHER_TYPE); + + sessionCatalogStore.storeCatalog(catalogName, descriptor); + + verify(gravitinoCatalogStore).storeCatalog(catalogName, descriptor); + verify(memoryCatalogStore, never()).storeCatalog(catalogName, descriptor); Review Comment: The test currently asserts that a third-party catalog type is stored in Gravitino. This is reversed from the intended behavior (third-party catalogs should be session-scoped and stored in memory only). Update this test to verify that non-built-in types are stored in the in-memory store (and not persisted to Gravitino). ```suggestion public void testStoreCatalog_builtInType_storesInGravitino() throws CatalogException { String catalogName = "hive_catalog"; CatalogDescriptor descriptor = descriptorWithType(BUILT_IN_TYPE); sessionCatalogStore.storeCatalog(catalogName, descriptor); verify(gravitinoCatalogStore).storeCatalog(catalogName, descriptor); verify(memoryCatalogStore, never()).storeCatalog(catalogName, descriptor); } @Test public void testStoreCatalog_thirdPartyType_storesInMemory() throws CatalogException { String catalogName = "hive"; CatalogDescriptor descriptor = descriptorWithType(OTHER_TYPE); sessionCatalogStore.storeCatalog(catalogName, descriptor); verify(memoryCatalogStore).storeCatalog(catalogName, descriptor); verify(gravitinoCatalogStore, never()).storeCatalog(catalogName, descriptor); ``` ########## flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.store; + +import static org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import org.apache.flink.table.catalog.AbstractCatalogStore; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.util.Preconditions; + +/** GravitinoCatalogStore is used to store catalog information to Apache Gravitino server. */ Review Comment: The class-level JavaDoc still describes GravitinoCatalogStore rather than GravitinoSessionCatalogStore, which is misleading for readers and API users. Please update the JavaDoc to describe that this store combines an in-memory store (session-scoped) with a Gravitino-backed store and the precedence rules between them. ```suggestion /** * A catalog store that combines a session-scoped in-memory * {@link GenericInMemoryCatalogStore} with a persistent {@link GravitinoCatalogStore}. * * <p>Catalogs for built-in catalog types are stored only in the in-memory store, while all * other catalogs are stored in the Gravitino-backed store. When retrieving, listing, or * removing catalogs, entries in the in-memory store take precedence over entries in the * Gravitino-backed store. * * <p>This store is intended to be used per Flink session, keeping transient catalogs in * memory while delegating persistent catalogs to Apache Gravitino. */ ``` ########## flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java: ########## @@ -43,10 +45,17 @@ /** The Factory for creating {@link GravitinoCatalogStore}. */ public class GravitinoCatalogStoreFactory implements CatalogStoreFactory { private GravitinoCatalogManager catalogManager; + private boolean supportSessionCatalog; + GenericInMemoryCatalogStore memoryCatalogStore; Review Comment: memoryCatalogStore is declared without an access modifier, making it package-private while the other fields are private. Please make it private (and keep member visibility consistent) unless there is a specific need for package access. ```suggestion private GenericInMemoryCatalogStore memoryCatalogStore; ``` ########## docs/flink-connector/flink-connector.md: ########## @@ -35,8 +35,20 @@ This capability allows users to perform federation queries, accessing data from | table.catalog-store.kind | string | generic_in_memory | The Catalog Store name, it should set to `gravitino`. | Yes | 0.6.0-incubating | | table.catalog-store.gravitino.gravitino.metalake | string | (none) | The metalake name that flink connector used to request to Gravitino. | Yes | 0.6.0-incubating | | table.catalog-store.gravitino.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.6.0-incubating | +| table.catalog-store.gravitino.gravitino.support.session.catalog | boolean | false | Whether to enable support for Flink's session catalog in the Gravitino catalog store. | No | 1.3.0 | | table.catalog-store.gravitino.gravitino.client. | string | (none) | The configuration key prefix for the Gravitino client config. | No | 1.0.0 | +When `table.catalog-store.gravitino.gravitino.support.session.catalog` is set to `true`, Gravitino uses `GravitinoSessionCatalogStore`, which combines a `GravitinoCatalogStore` (backed by the Gravitino server) with an in-memory store to support Flink's session catalog. When `false` (the default), only `GravitinoCatalogStore` is used. + +When session catalog support is enabled, the following behaviors apply: + +- **CREATE CATALOG**: Gravitino-managed catalog are persisted to the Gravitino server; non-Gravitino catalog are stored in the in-memory store only. +- **GET / USE CATALOG**: The in-memory store is checked first. If the catalog is not found there, it is retrieved from the Gravitino server. +- **DROP CATALOG**: The in-memory store is checked first. If the catalog exists there it is removed from memory; otherwise it is removed from the Gravitino server. +- **SHOW / LIST CATALOGS**: Returns the combined set of catalogs from both the in-memory store and the Gravitino server. +- **Session scope**: Catalogs stored only in memory are session-scoped and will not survive when Flink restart. Review Comment: Grammar: "will not survive when Flink restart" should be "will not survive when Flink restarts". ```suggestion - **Session scope**: Catalogs stored only in memory are session-scoped and will not survive when Flink restarts. ``` ########## docs/flink-connector/flink-connector.md: ########## @@ -35,8 +35,20 @@ This capability allows users to perform federation queries, accessing data from | table.catalog-store.kind | string | generic_in_memory | The Catalog Store name, it should set to `gravitino`. | Yes | 0.6.0-incubating | | table.catalog-store.gravitino.gravitino.metalake | string | (none) | The metalake name that flink connector used to request to Gravitino. | Yes | 0.6.0-incubating | | table.catalog-store.gravitino.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.6.0-incubating | +| table.catalog-store.gravitino.gravitino.support.session.catalog | boolean | false | Whether to enable support for Flink's session catalog in the Gravitino catalog store. | No | 1.3.0 | | table.catalog-store.gravitino.gravitino.client. | string | (none) | The configuration key prefix for the Gravitino client config. | No | 1.0.0 | +When `table.catalog-store.gravitino.gravitino.support.session.catalog` is set to `true`, Gravitino uses `GravitinoSessionCatalogStore`, which combines a `GravitinoCatalogStore` (backed by the Gravitino server) with an in-memory store to support Flink's session catalog. When `false` (the default), only `GravitinoCatalogStore` is used. + +When session catalog support is enabled, the following behaviors apply: + +- **CREATE CATALOG**: Gravitino-managed catalog are persisted to the Gravitino server; non-Gravitino catalog are stored in the in-memory store only. +- **GET / USE CATALOG**: The in-memory store is checked first. If the catalog is not found there, it is retrieved from the Gravitino server. +- **DROP CATALOG**: The in-memory store is checked first. If the catalog exists there it is removed from memory; otherwise it is removed from the Gravitino server. +- **SHOW / LIST CATALOGS**: Returns the combined set of catalogs from both the in-memory store and the Gravitino server. +- **Session scope**: Catalogs stored only in memory are session-scoped and will not survive when Flink restart. +- **Name conflict**: If a catalog with the same name exists in both stores, the in-memory entry takes precedence. Review Comment: The PR description states that SHOW/LIST CATALOGS should return the *intersection* of catalogs in memory and Gravitino, but this documentation (and the implementation/tests) describe returning the combined set (union). Please resolve this discrepancy by updating either the PR description or the docs/implementation so they match the intended behavior. ########## flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java: ########## @@ -36,6 +41,20 @@ private FactoryUtils() {} private static final Logger LOG = LoggerFactory.getLogger(FactoryUtils.class); + /** The list of Gravitino catalog factory identifiers. */ + public static final ImmutableList<String> GRAVITINO_FACTORY_LIST = + ImmutableList.<String>builder() + .add(GravitinoHiveCatalogFactoryOptions.IDENTIFIER) + .add(GravitinoIcebergCatalogFactoryOptions.IDENTIFIER) + .add(GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER) + .add(GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER) + .add(GravitinoPaimonCatalogFactoryOptions.IDENTIFIER) + .build(); + + public static boolean isBuiltInCatalog(String type) { + return GRAVITINO_FACTORY_LIST.contains(type); Review Comment: isBuiltInCatalog() uses a case-sensitive contains() check. Elsewhere in the Flink connector code, catalog type matching is done case-insensitively (e.g., equalsIgnoreCase in GravitinoCatalogStore#getCatalogFactory), so a user-supplied type with different casing could be misclassified here. Consider normalizing to lower-case (or using a case-insensitive set) before checking membership. ```suggestion for (String identifier : GRAVITINO_FACTORY_LIST) { if (identifier.equalsIgnoreCase(type)) { return true; } } return false; ``` ########## docs/flink-connector/flink-connector.md: ########## @@ -35,8 +35,20 @@ This capability allows users to perform federation queries, accessing data from | table.catalog-store.kind | string | generic_in_memory | The Catalog Store name, it should set to `gravitino`. | Yes | 0.6.0-incubating | | table.catalog-store.gravitino.gravitino.metalake | string | (none) | The metalake name that flink connector used to request to Gravitino. | Yes | 0.6.0-incubating | | table.catalog-store.gravitino.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.6.0-incubating | +| table.catalog-store.gravitino.gravitino.support.session.catalog | boolean | false | Whether to enable support for Flink's session catalog in the Gravitino catalog store. | No | 1.3.0 | | table.catalog-store.gravitino.gravitino.client. | string | (none) | The configuration key prefix for the Gravitino client config. | No | 1.0.0 | +When `table.catalog-store.gravitino.gravitino.support.session.catalog` is set to `true`, Gravitino uses `GravitinoSessionCatalogStore`, which combines a `GravitinoCatalogStore` (backed by the Gravitino server) with an in-memory store to support Flink's session catalog. When `false` (the default), only `GravitinoCatalogStore` is used. + +When session catalog support is enabled, the following behaviors apply: + +- **CREATE CATALOG**: Gravitino-managed catalog are persisted to the Gravitino server; non-Gravitino catalog are stored in the in-memory store only. Review Comment: Grammar: "Gravitino-managed catalog are persisted" should use the plural "catalogs" (and keep subject/verb agreement). ```suggestion - **CREATE CATALOG**: Gravitino-managed catalogs are persisted to the Gravitino server; non-Gravitino catalogs are stored in the in-memory store only. ``` ########## flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoSessionCatalogStore.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.store; + +import static org.apache.gravitino.flink.connector.utils.FactoryUtils.GRAVITINO_FACTORY_LIST; +import static org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import java.util.Optional; +import java.util.Set; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.junit.Before; +import org.junit.Test; + +public class TestGravitinoSessionCatalogStore { + + /** A catalog type that isBuiltInCatalog() recognises as Gravitino-managed. */ + private static final String BUILT_IN_TYPE = GRAVITINO_FACTORY_LIST.get(0); + + /** A catalog type that isBuiltInCatalog() does NOT recognise. */ + private static final String OTHER_TYPE = "hive"; + + private GravitinoCatalogStore gravitinoCatalogStore; + private GenericInMemoryCatalogStore memoryCatalogStore; + private GravitinoSessionCatalogStore sessionCatalogStore; + + @Before + public void setUp() { + assertTrue( + "BUILT_IN_TYPE must be recognised by isBuiltInCatalog()", isBuiltInCatalog(BUILT_IN_TYPE)); + assertFalse( + "OTHER_TYPE must NOT be recognised by isBuiltInCatalog()", isBuiltInCatalog(OTHER_TYPE)); + gravitinoCatalogStore = mock(GravitinoCatalogStore.class); + memoryCatalogStore = mock(GenericInMemoryCatalogStore.class); + sessionCatalogStore = + new GravitinoSessionCatalogStore(gravitinoCatalogStore, memoryCatalogStore); + } + + // ------------------------------------------------------------------------- + // storeCatalog + // ------------------------------------------------------------------------- + + @Test + public void testStoreCatalog_builtInType_storesInMemory() throws CatalogException { + String catalogName = "hive_catalog"; + CatalogDescriptor descriptor = descriptorWithType(BUILT_IN_TYPE); + + sessionCatalogStore.storeCatalog(catalogName, descriptor); + + verify(memoryCatalogStore).storeCatalog(catalogName, descriptor); + verify(gravitinoCatalogStore, never()).storeCatalog(catalogName, descriptor); + } + + @Test + public void testStoreCatalog_thirdPartyType_storesInGravitino() throws CatalogException { + String catalogName = "hive"; + CatalogDescriptor descriptor = descriptorWithType(OTHER_TYPE); + + sessionCatalogStore.storeCatalog(catalogName, descriptor); + + verify(gravitinoCatalogStore).storeCatalog(catalogName, descriptor); + verify(memoryCatalogStore, never()).storeCatalog(catalogName, descriptor); Review Comment: The test currently asserts that a built-in (Gravitino-managed) catalog type is stored in the in-memory store. This expectation contradicts the intended behavior described in the PR/docs (managed catalogs should be persisted to Gravitino). Update this test to assert that built-in types are stored via GravitinoCatalogStore instead. ```suggestion public void testStoreCatalog_builtInType_storesInGravitino() throws CatalogException { String catalogName = "hive_catalog"; CatalogDescriptor descriptor = descriptorWithType(BUILT_IN_TYPE); sessionCatalogStore.storeCatalog(catalogName, descriptor); verify(gravitinoCatalogStore).storeCatalog(catalogName, descriptor); verify(memoryCatalogStore, never()).storeCatalog(catalogName, descriptor); } @Test public void testStoreCatalog_thirdPartyType_storesInMemory() throws CatalogException { String catalogName = "hive"; CatalogDescriptor descriptor = descriptorWithType(OTHER_TYPE); sessionCatalogStore.storeCatalog(catalogName, descriptor); verify(memoryCatalogStore).storeCatalog(catalogName, descriptor); verify(gravitinoCatalogStore, never()).storeCatalog(catalogName, descriptor); ``` ########## flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java: ########## @@ -43,10 +45,17 @@ /** The Factory for creating {@link GravitinoCatalogStore}. */ public class GravitinoCatalogStoreFactory implements CatalogStoreFactory { private GravitinoCatalogManager catalogManager; + private boolean supportSessionCatalog; + GenericInMemoryCatalogStore memoryCatalogStore; @Override public CatalogStore createCatalogStore() { - return new GravitinoCatalogStore(catalogManager); + GravitinoCatalogStore gravitinoCatalogStore = new GravitinoCatalogStore(catalogManager); + if (supportSessionCatalog) { + memoryCatalogStore = new GenericInMemoryCatalogStore(); + return new GravitinoSessionCatalogStore(gravitinoCatalogStore, memoryCatalogStore); + } + return gravitinoCatalogStore; Review Comment: New behavior is introduced behind GRAVITINO_SUPPORT_SESSION_CATALOG (returning GravitinoSessionCatalogStore / using an in-memory store), but there is no unit test covering that the factory returns the session store when the option is enabled and the plain GravitinoCatalogStore when disabled. Please add a focused test (similar to TestGravitinoFlinkConfig) to prevent regressions in wiring/option parsing. ########## flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.store; + +import static org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import org.apache.flink.table.catalog.AbstractCatalogStore; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.util.Preconditions; + +/** GravitinoCatalogStore is used to store catalog information to Apache Gravitino server. */ +public class GravitinoSessionCatalogStore extends AbstractCatalogStore { + private final GenericInMemoryCatalogStore memoryCatalogStore; + private final GravitinoCatalogStore gravitinoCatalogStore; + + public GravitinoSessionCatalogStore( + GravitinoCatalogStore gravitinoCatalogStore, GenericInMemoryCatalogStore memoryCatalogStore) { + this.gravitinoCatalogStore = + Preconditions.checkNotNull(gravitinoCatalogStore, "CatalogStore cannot be null"); + this.memoryCatalogStore = + Preconditions.checkNotNull(memoryCatalogStore, "MemoryCatalogStore cannot be null"); + } + + @Override + public void storeCatalog(String catalogName, CatalogDescriptor descriptor) + throws CatalogException { + String catalogType = descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE); + if (isBuiltInCatalog(catalogType)) { + memoryCatalogStore.storeCatalog(catalogName, descriptor); + } else { + gravitinoCatalogStore.storeCatalog(catalogName, descriptor); + } + } + /** + * Removes the specified catalog. + * + * @param catalogName name of the catalog to remove + * @param ignoreIfNotExists if true, ignore when the catalog does not exist + * @throws CatalogException if the catalog cannot be removed + */ + @Override + public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException { + if (memoryCatalogStore.contains(catalogName)) { + memoryCatalogStore.removeCatalog(catalogName, ignoreIfNotExists); + } else { + gravitinoCatalogStore.removeCatalog(catalogName, ignoreIfNotExists); + } + } + + /** + * Get a catalog by name. + * + * @param catalogName name of the catalog to retrieve + * @return the requested catalog or empty if the catalog does not exist + * @throws CatalogException throw a CatalogException when the Catalog cannot be created. + */ + @Override + public Optional<CatalogDescriptor> getCatalog(String catalogName) throws CatalogException { + if (memoryCatalogStore.contains(catalogName)) { + Optional<CatalogDescriptor> descriptor = memoryCatalogStore.getCatalog(catalogName); + if (descriptor.isPresent()) { + return descriptor; + } + } + return gravitinoCatalogStore.getCatalog(catalogName); + } + + @Override + public Set<String> listCatalogs() throws CatalogException { + Set<String> catalogs = new HashSet<>(); + catalogs.addAll(memoryCatalogStore.listCatalogs()); + try { + catalogs.addAll(gravitinoCatalogStore.listCatalogs()); + } catch (Exception e) { + throw new CatalogException("Failed to list catalog.", e); Review Comment: listCatalogs() catches a generic Exception. This can unintentionally swallow InterruptedException and clear the interrupt flag, and it also makes it harder to reason about which failures are expected here. Prefer catching CatalogException (and/or RuntimeException) explicitly; if InterruptedException is possible, re-interrupt the thread before wrapping. ```suggestion } catch (CatalogException e) { throw new CatalogException("Failed to list catalog.", e); } catch (RuntimeException e) { throw new CatalogException("Failed to list catalog.", e); ``` ########## docs/flink-connector/flink-connector.md: ########## @@ -35,8 +35,20 @@ This capability allows users to perform federation queries, accessing data from | table.catalog-store.kind | string | generic_in_memory | The Catalog Store name, it should set to `gravitino`. | Yes | 0.6.0-incubating | | table.catalog-store.gravitino.gravitino.metalake | string | (none) | The metalake name that flink connector used to request to Gravitino. | Yes | 0.6.0-incubating | | table.catalog-store.gravitino.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.6.0-incubating | +| table.catalog-store.gravitino.gravitino.support.session.catalog | boolean | false | Whether to enable support for Flink's session catalog in the Gravitino catalog store. | No | 1.3.0 | | table.catalog-store.gravitino.gravitino.client. | string | (none) | The configuration key prefix for the Gravitino client config. | No | 1.0.0 | +When `table.catalog-store.gravitino.gravitino.support.session.catalog` is set to `true`, Gravitino uses `GravitinoSessionCatalogStore`, which combines a `GravitinoCatalogStore` (backed by the Gravitino server) with an in-memory store to support Flink's session catalog. When `false` (the default), only `GravitinoCatalogStore` is used. + Review Comment: The PR description mentions a user-facing config `table.catalog-store.gravitino.gravitino.allow.third-party-connector.list`, but the docs/code introduce `table.catalog-store.gravitino.gravitino.support.session.catalog` instead. Please align the documentation (and/or implementation) with the PR description so users have a single, correct configuration path. ########## flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.store; + +import static org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import org.apache.flink.table.catalog.AbstractCatalogStore; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.util.Preconditions; + +/** GravitinoCatalogStore is used to store catalog information to Apache Gravitino server. */ +public class GravitinoSessionCatalogStore extends AbstractCatalogStore { + private final GenericInMemoryCatalogStore memoryCatalogStore; + private final GravitinoCatalogStore gravitinoCatalogStore; + + public GravitinoSessionCatalogStore( + GravitinoCatalogStore gravitinoCatalogStore, GenericInMemoryCatalogStore memoryCatalogStore) { + this.gravitinoCatalogStore = + Preconditions.checkNotNull(gravitinoCatalogStore, "CatalogStore cannot be null"); + this.memoryCatalogStore = + Preconditions.checkNotNull(memoryCatalogStore, "MemoryCatalogStore cannot be null"); + } + + @Override + public void storeCatalog(String catalogName, CatalogDescriptor descriptor) + throws CatalogException { + String catalogType = descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE); + if (isBuiltInCatalog(catalogType)) { + memoryCatalogStore.storeCatalog(catalogName, descriptor); + } else { + gravitinoCatalogStore.storeCatalog(catalogName, descriptor); Review Comment: storeCatalog routes Gravitino-managed (built-in) catalog types to the in-memory store and routes non-built-in types to Gravitino. This is the opposite of the documented/expected behavior (Gravitino-managed catalogs should be persisted to the Gravitino server; third-party catalogs should stay session-scoped in memory). Please swap the target stores for the built-in vs third-party branch (and keep the higher-priority lookup semantics in get/remove/contains). ```suggestion gravitinoCatalogStore.storeCatalog(catalogName, descriptor); } else { memoryCatalogStore.storeCatalog(catalogName, descriptor); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
