Copilot commented on code in PR #10560:
URL: https://github.com/apache/gravitino/pull/10560#discussion_r3007794746


##########
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoSessionCatalogStore.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.store;
+
+import static 
org.apache.gravitino.flink.connector.utils.FactoryUtils.GRAVITINO_FACTORY_LIST;
+import static 
org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestGravitinoSessionCatalogStore {
+
+  /** A catalog type that isBuiltInCatalog() recognises as Gravitino-managed. 
*/
+  private static final String BUILT_IN_TYPE = GRAVITINO_FACTORY_LIST.get(0);
+
+  /** A catalog type that isBuiltInCatalog() does NOT recognise. */
+  private static final String OTHER_TYPE = "hive";
+
+  private GravitinoCatalogStore gravitinoCatalogStore;
+  private GenericInMemoryCatalogStore memoryCatalogStore;
+  private GravitinoSessionCatalogStore sessionCatalogStore;
+
+  @Before
+  public void setUp() {
+    assertTrue(
+        "BUILT_IN_TYPE must be recognised by isBuiltInCatalog()", 
isBuiltInCatalog(BUILT_IN_TYPE));
+    assertFalse(
+        "OTHER_TYPE must NOT be recognised by isBuiltInCatalog()", 
isBuiltInCatalog(OTHER_TYPE));
+    gravitinoCatalogStore = mock(GravitinoCatalogStore.class);
+    memoryCatalogStore = mock(GenericInMemoryCatalogStore.class);
+    sessionCatalogStore =
+        new GravitinoSessionCatalogStore(gravitinoCatalogStore, 
memoryCatalogStore);
+  }
+
+  // -------------------------------------------------------------------------
+  // storeCatalog
+  // -------------------------------------------------------------------------
+
+  @Test
+  public void testStoreCatalog_builtInType_storesInMemory() throws 
CatalogException {
+    String catalogName = "hive_catalog";
+    CatalogDescriptor descriptor = descriptorWithType(BUILT_IN_TYPE);
+
+    sessionCatalogStore.storeCatalog(catalogName, descriptor);
+
+    verify(memoryCatalogStore).storeCatalog(catalogName, descriptor);
+    verify(gravitinoCatalogStore, never()).storeCatalog(catalogName, 
descriptor);
+  }
+
+  @Test
+  public void testStoreCatalog_thirdPartyType_storesInGravitino() throws 
CatalogException {
+    String catalogName = "hive";
+    CatalogDescriptor descriptor = descriptorWithType(OTHER_TYPE);
+
+    sessionCatalogStore.storeCatalog(catalogName, descriptor);
+
+    verify(gravitinoCatalogStore).storeCatalog(catalogName, descriptor);
+    verify(memoryCatalogStore, never()).storeCatalog(catalogName, descriptor);

Review Comment:
   The test currently asserts that a third-party catalog type is stored in 
Gravitino. This is reversed from the intended behavior (third-party catalogs 
should be session-scoped and stored in memory only). Update this test to verify 
that non-built-in types are stored in the in-memory store (and not persisted to 
Gravitino).
   ```suggestion
     public void testStoreCatalog_builtInType_storesInGravitino() throws 
CatalogException {
       String catalogName = "hive_catalog";
       CatalogDescriptor descriptor = descriptorWithType(BUILT_IN_TYPE);
   
       sessionCatalogStore.storeCatalog(catalogName, descriptor);
   
       verify(gravitinoCatalogStore).storeCatalog(catalogName, descriptor);
       verify(memoryCatalogStore, never()).storeCatalog(catalogName, 
descriptor);
     }
   
     @Test
     public void testStoreCatalog_thirdPartyType_storesInMemory() throws 
CatalogException {
       String catalogName = "hive";
       CatalogDescriptor descriptor = descriptorWithType(OTHER_TYPE);
   
       sessionCatalogStore.storeCatalog(catalogName, descriptor);
   
       verify(memoryCatalogStore).storeCatalog(catalogName, descriptor);
       verify(gravitinoCatalogStore, never()).storeCatalog(catalogName, 
descriptor);
   ```



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.store;
+
+import static 
org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.flink.table.catalog.AbstractCatalogStore;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.util.Preconditions;
+
+/** GravitinoCatalogStore is used to store catalog information to Apache 
Gravitino server. */

Review Comment:
   The class-level JavaDoc still describes GravitinoCatalogStore rather than 
GravitinoSessionCatalogStore, which is misleading for readers and API users. 
Please update the JavaDoc to describe that this store combines an in-memory 
store (session-scoped) with a Gravitino-backed store and the precedence rules 
between them.
   ```suggestion
   /**
    * A catalog store that combines a session-scoped in-memory
    * {@link GenericInMemoryCatalogStore} with a persistent {@link 
GravitinoCatalogStore}.
    *
    * <p>Catalogs for built-in catalog types are stored only in the in-memory 
store, while all
    * other catalogs are stored in the Gravitino-backed store. When retrieving, 
listing, or
    * removing catalogs, entries in the in-memory store take precedence over 
entries in the
    * Gravitino-backed store.
    *
    * <p>This store is intended to be used per Flink session, keeping transient 
catalogs in
    * memory while delegating persistent catalogs to Apache Gravitino.
    */
   ```



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java:
##########
@@ -43,10 +45,17 @@
 /** The Factory for creating {@link GravitinoCatalogStore}. */
 public class GravitinoCatalogStoreFactory implements CatalogStoreFactory {
   private GravitinoCatalogManager catalogManager;
+  private boolean supportSessionCatalog;
+  GenericInMemoryCatalogStore memoryCatalogStore;

Review Comment:
   memoryCatalogStore is declared without an access modifier, making it 
package-private while the other fields are private. Please make it private (and 
keep member visibility consistent) unless there is a specific need for package 
access.
   ```suggestion
     private GenericInMemoryCatalogStore memoryCatalogStore;
   ```



##########
docs/flink-connector/flink-connector.md:
##########
@@ -35,8 +35,20 @@ This capability allows users to perform federation queries, 
accessing data from
 | table.catalog-store.kind                         | string | 
generic_in_memory | The Catalog Store name, it should set to `gravitino`.       
         | Yes      | 0.6.0-incubating |
 | table.catalog-store.gravitino.gravitino.metalake | string | (none)           
 | The metalake name that flink connector used to request to Gravitino. | Yes   
   | 0.6.0-incubating |
 | table.catalog-store.gravitino.gravitino.uri      | string | (none)           
 | The uri of Gravitino server address.                                 | Yes   
   | 0.6.0-incubating |
+| table.catalog-store.gravitino.gravitino.support.session.catalog | boolean | 
false | Whether to enable support for Flink's session catalog in the Gravitino 
catalog store. | No | 1.3.0 |
 | table.catalog-store.gravitino.gravitino.client.  | string | (none)           
 | The configuration key prefix for the Gravitino client config.        | No    
   | 1.0.0            |
 
+When `table.catalog-store.gravitino.gravitino.support.session.catalog` is set 
to `true`, Gravitino uses `GravitinoSessionCatalogStore`, which combines a 
`GravitinoCatalogStore` (backed by the Gravitino server) with an in-memory 
store to support Flink's session catalog. When `false` (the default), only 
`GravitinoCatalogStore` is used.
+
+When session catalog support is enabled, the following behaviors apply:
+
+- **CREATE CATALOG**: Gravitino-managed catalog are persisted to the Gravitino 
server; non-Gravitino catalog are stored in the in-memory store only.
+- **GET / USE CATALOG**: The in-memory store is checked first. If the catalog 
is not found there, it is retrieved from the Gravitino server.
+- **DROP CATALOG**: The in-memory store is checked first. If the catalog 
exists there it is removed from memory; otherwise it is removed from the 
Gravitino server.
+- **SHOW / LIST CATALOGS**: Returns the combined set of catalogs from both the 
in-memory store and the Gravitino server.
+- **Session scope**: Catalogs stored only in memory are session-scoped and 
will not survive when Flink restart.

Review Comment:
   Grammar: "will not survive when Flink restart" should be "will not survive 
when Flink restarts".
   ```suggestion
   - **Session scope**: Catalogs stored only in memory are session-scoped and 
will not survive when Flink restarts.
   ```



##########
docs/flink-connector/flink-connector.md:
##########
@@ -35,8 +35,20 @@ This capability allows users to perform federation queries, 
accessing data from
 | table.catalog-store.kind                         | string | 
generic_in_memory | The Catalog Store name, it should set to `gravitino`.       
         | Yes      | 0.6.0-incubating |
 | table.catalog-store.gravitino.gravitino.metalake | string | (none)           
 | The metalake name that flink connector used to request to Gravitino. | Yes   
   | 0.6.0-incubating |
 | table.catalog-store.gravitino.gravitino.uri      | string | (none)           
 | The uri of Gravitino server address.                                 | Yes   
   | 0.6.0-incubating |
+| table.catalog-store.gravitino.gravitino.support.session.catalog | boolean | 
false | Whether to enable support for Flink's session catalog in the Gravitino 
catalog store. | No | 1.3.0 |
 | table.catalog-store.gravitino.gravitino.client.  | string | (none)           
 | The configuration key prefix for the Gravitino client config.        | No    
   | 1.0.0            |
 
+When `table.catalog-store.gravitino.gravitino.support.session.catalog` is set 
to `true`, Gravitino uses `GravitinoSessionCatalogStore`, which combines a 
`GravitinoCatalogStore` (backed by the Gravitino server) with an in-memory 
store to support Flink's session catalog. When `false` (the default), only 
`GravitinoCatalogStore` is used.
+
+When session catalog support is enabled, the following behaviors apply:
+
+- **CREATE CATALOG**: Gravitino-managed catalog are persisted to the Gravitino 
server; non-Gravitino catalog are stored in the in-memory store only.
+- **GET / USE CATALOG**: The in-memory store is checked first. If the catalog 
is not found there, it is retrieved from the Gravitino server.
+- **DROP CATALOG**: The in-memory store is checked first. If the catalog 
exists there it is removed from memory; otherwise it is removed from the 
Gravitino server.
+- **SHOW / LIST CATALOGS**: Returns the combined set of catalogs from both the 
in-memory store and the Gravitino server.
+- **Session scope**: Catalogs stored only in memory are session-scoped and 
will not survive when Flink restart.
+- **Name conflict**: If a catalog with the same name exists in both stores, 
the in-memory entry takes precedence.

Review Comment:
   The PR description states that SHOW/LIST CATALOGS should return the 
*intersection* of catalogs in memory and Gravitino, but this documentation (and 
the implementation/tests) describe returning the combined set (union). Please 
resolve this discrepancy by updating either the PR description or the 
docs/implementation so they match the intended behavior.



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java:
##########
@@ -36,6 +41,20 @@ private FactoryUtils() {}
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FactoryUtils.class);
 
+  /** The list of Gravitino catalog factory identifiers. */
+  public static final ImmutableList<String> GRAVITINO_FACTORY_LIST =
+      ImmutableList.<String>builder()
+          .add(GravitinoHiveCatalogFactoryOptions.IDENTIFIER)
+          .add(GravitinoIcebergCatalogFactoryOptions.IDENTIFIER)
+          .add(GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER)
+          .add(GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER)
+          .add(GravitinoPaimonCatalogFactoryOptions.IDENTIFIER)
+          .build();
+
+  public static boolean isBuiltInCatalog(String type) {
+    return GRAVITINO_FACTORY_LIST.contains(type);

Review Comment:
   isBuiltInCatalog() uses a case-sensitive contains() check. Elsewhere in the 
Flink connector code, catalog type matching is done case-insensitively (e.g., 
equalsIgnoreCase in GravitinoCatalogStore#getCatalogFactory), so a 
user-supplied type with different casing could be misclassified here. Consider 
normalizing to lower-case (or using a case-insensitive set) before checking 
membership.
   ```suggestion
       for (String identifier : GRAVITINO_FACTORY_LIST) {
         if (identifier.equalsIgnoreCase(type)) {
           return true;
         }
       }
       return false;
   ```



##########
docs/flink-connector/flink-connector.md:
##########
@@ -35,8 +35,20 @@ This capability allows users to perform federation queries, 
accessing data from
 | table.catalog-store.kind                         | string | 
generic_in_memory | The Catalog Store name, it should set to `gravitino`.       
         | Yes      | 0.6.0-incubating |
 | table.catalog-store.gravitino.gravitino.metalake | string | (none)           
 | The metalake name that flink connector used to request to Gravitino. | Yes   
   | 0.6.0-incubating |
 | table.catalog-store.gravitino.gravitino.uri      | string | (none)           
 | The uri of Gravitino server address.                                 | Yes   
   | 0.6.0-incubating |
+| table.catalog-store.gravitino.gravitino.support.session.catalog | boolean | 
false | Whether to enable support for Flink's session catalog in the Gravitino 
catalog store. | No | 1.3.0 |
 | table.catalog-store.gravitino.gravitino.client.  | string | (none)           
 | The configuration key prefix for the Gravitino client config.        | No    
   | 1.0.0            |
 
+When `table.catalog-store.gravitino.gravitino.support.session.catalog` is set 
to `true`, Gravitino uses `GravitinoSessionCatalogStore`, which combines a 
`GravitinoCatalogStore` (backed by the Gravitino server) with an in-memory 
store to support Flink's session catalog. When `false` (the default), only 
`GravitinoCatalogStore` is used.
+
+When session catalog support is enabled, the following behaviors apply:
+
+- **CREATE CATALOG**: Gravitino-managed catalog are persisted to the Gravitino 
server; non-Gravitino catalog are stored in the in-memory store only.

Review Comment:
   Grammar: "Gravitino-managed catalog are persisted" should use the plural 
"catalogs" (and keep subject/verb agreement).
   ```suggestion
   - **CREATE CATALOG**: Gravitino-managed catalogs are persisted to the 
Gravitino server; non-Gravitino catalogs are stored in the in-memory store only.
   ```



##########
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoSessionCatalogStore.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.store;
+
+import static 
org.apache.gravitino.flink.connector.utils.FactoryUtils.GRAVITINO_FACTORY_LIST;
+import static 
org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestGravitinoSessionCatalogStore {
+
+  /** A catalog type that isBuiltInCatalog() recognises as Gravitino-managed. 
*/
+  private static final String BUILT_IN_TYPE = GRAVITINO_FACTORY_LIST.get(0);
+
+  /** A catalog type that isBuiltInCatalog() does NOT recognise. */
+  private static final String OTHER_TYPE = "hive";
+
+  private GravitinoCatalogStore gravitinoCatalogStore;
+  private GenericInMemoryCatalogStore memoryCatalogStore;
+  private GravitinoSessionCatalogStore sessionCatalogStore;
+
+  @Before
+  public void setUp() {
+    assertTrue(
+        "BUILT_IN_TYPE must be recognised by isBuiltInCatalog()", 
isBuiltInCatalog(BUILT_IN_TYPE));
+    assertFalse(
+        "OTHER_TYPE must NOT be recognised by isBuiltInCatalog()", 
isBuiltInCatalog(OTHER_TYPE));
+    gravitinoCatalogStore = mock(GravitinoCatalogStore.class);
+    memoryCatalogStore = mock(GenericInMemoryCatalogStore.class);
+    sessionCatalogStore =
+        new GravitinoSessionCatalogStore(gravitinoCatalogStore, 
memoryCatalogStore);
+  }
+
+  // -------------------------------------------------------------------------
+  // storeCatalog
+  // -------------------------------------------------------------------------
+
+  @Test
+  public void testStoreCatalog_builtInType_storesInMemory() throws 
CatalogException {
+    String catalogName = "hive_catalog";
+    CatalogDescriptor descriptor = descriptorWithType(BUILT_IN_TYPE);
+
+    sessionCatalogStore.storeCatalog(catalogName, descriptor);
+
+    verify(memoryCatalogStore).storeCatalog(catalogName, descriptor);
+    verify(gravitinoCatalogStore, never()).storeCatalog(catalogName, 
descriptor);
+  }
+
+  @Test
+  public void testStoreCatalog_thirdPartyType_storesInGravitino() throws 
CatalogException {
+    String catalogName = "hive";
+    CatalogDescriptor descriptor = descriptorWithType(OTHER_TYPE);
+
+    sessionCatalogStore.storeCatalog(catalogName, descriptor);
+
+    verify(gravitinoCatalogStore).storeCatalog(catalogName, descriptor);
+    verify(memoryCatalogStore, never()).storeCatalog(catalogName, descriptor);

Review Comment:
   The test currently asserts that a built-in (Gravitino-managed) catalog type 
is stored in the in-memory store. This expectation contradicts the intended 
behavior described in the PR/docs (managed catalogs should be persisted to 
Gravitino). Update this test to assert that built-in types are stored via 
GravitinoCatalogStore instead.
   ```suggestion
     public void testStoreCatalog_builtInType_storesInGravitino() throws 
CatalogException {
       String catalogName = "hive_catalog";
       CatalogDescriptor descriptor = descriptorWithType(BUILT_IN_TYPE);
   
       sessionCatalogStore.storeCatalog(catalogName, descriptor);
   
       verify(gravitinoCatalogStore).storeCatalog(catalogName, descriptor);
       verify(memoryCatalogStore, never()).storeCatalog(catalogName, 
descriptor);
     }
   
     @Test
     public void testStoreCatalog_thirdPartyType_storesInMemory() throws 
CatalogException {
       String catalogName = "hive";
       CatalogDescriptor descriptor = descriptorWithType(OTHER_TYPE);
   
       sessionCatalogStore.storeCatalog(catalogName, descriptor);
   
       verify(memoryCatalogStore).storeCatalog(catalogName, descriptor);
       verify(gravitinoCatalogStore, never()).storeCatalog(catalogName, 
descriptor);
   ```



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java:
##########
@@ -43,10 +45,17 @@
 /** The Factory for creating {@link GravitinoCatalogStore}. */
 public class GravitinoCatalogStoreFactory implements CatalogStoreFactory {
   private GravitinoCatalogManager catalogManager;
+  private boolean supportSessionCatalog;
+  GenericInMemoryCatalogStore memoryCatalogStore;
 
   @Override
   public CatalogStore createCatalogStore() {
-    return new GravitinoCatalogStore(catalogManager);
+    GravitinoCatalogStore gravitinoCatalogStore = new 
GravitinoCatalogStore(catalogManager);
+    if (supportSessionCatalog) {
+      memoryCatalogStore = new GenericInMemoryCatalogStore();
+      return new GravitinoSessionCatalogStore(gravitinoCatalogStore, 
memoryCatalogStore);
+    }
+    return gravitinoCatalogStore;

Review Comment:
   New behavior is introduced behind GRAVITINO_SUPPORT_SESSION_CATALOG 
(returning GravitinoSessionCatalogStore / using an in-memory store), but there 
is no unit test covering that the factory returns the session store when the 
option is enabled and the plain GravitinoCatalogStore when disabled. Please add 
a focused test (similar to TestGravitinoFlinkConfig) to prevent regressions in 
wiring/option parsing.



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.store;
+
+import static 
org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.flink.table.catalog.AbstractCatalogStore;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.util.Preconditions;
+
+/** GravitinoCatalogStore is used to store catalog information to Apache 
Gravitino server. */
+public class GravitinoSessionCatalogStore extends AbstractCatalogStore {
+  private final GenericInMemoryCatalogStore memoryCatalogStore;
+  private final GravitinoCatalogStore gravitinoCatalogStore;
+
+  public GravitinoSessionCatalogStore(
+      GravitinoCatalogStore gravitinoCatalogStore, GenericInMemoryCatalogStore 
memoryCatalogStore) {
+    this.gravitinoCatalogStore =
+        Preconditions.checkNotNull(gravitinoCatalogStore, "CatalogStore cannot 
be null");
+    this.memoryCatalogStore =
+        Preconditions.checkNotNull(memoryCatalogStore, "MemoryCatalogStore 
cannot be null");
+  }
+
+  @Override
+  public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
+      throws CatalogException {
+    String catalogType = 
descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE);
+    if (isBuiltInCatalog(catalogType)) {
+      memoryCatalogStore.storeCatalog(catalogName, descriptor);
+    } else {
+      gravitinoCatalogStore.storeCatalog(catalogName, descriptor);
+    }
+  }
+  /**
+   * Removes the specified catalog.
+   *
+   * @param catalogName name of the catalog to remove
+   * @param ignoreIfNotExists if true, ignore when the catalog does not exist
+   * @throws CatalogException if the catalog cannot be removed
+   */
+  @Override
+  public void removeCatalog(String catalogName, boolean ignoreIfNotExists) 
throws CatalogException {
+    if (memoryCatalogStore.contains(catalogName)) {
+      memoryCatalogStore.removeCatalog(catalogName, ignoreIfNotExists);
+    } else {
+      gravitinoCatalogStore.removeCatalog(catalogName, ignoreIfNotExists);
+    }
+  }
+
+  /**
+   * Get a catalog by name.
+   *
+   * @param catalogName name of the catalog to retrieve
+   * @return the requested catalog or empty if the catalog does not exist
+   * @throws CatalogException throw a CatalogException when the Catalog cannot 
be created.
+   */
+  @Override
+  public Optional<CatalogDescriptor> getCatalog(String catalogName) throws 
CatalogException {
+    if (memoryCatalogStore.contains(catalogName)) {
+      Optional<CatalogDescriptor> descriptor = 
memoryCatalogStore.getCatalog(catalogName);
+      if (descriptor.isPresent()) {
+        return descriptor;
+      }
+    }
+    return gravitinoCatalogStore.getCatalog(catalogName);
+  }
+
+  @Override
+  public Set<String> listCatalogs() throws CatalogException {
+    Set<String> catalogs = new HashSet<>();
+    catalogs.addAll(memoryCatalogStore.listCatalogs());
+    try {
+      catalogs.addAll(gravitinoCatalogStore.listCatalogs());
+    } catch (Exception e) {
+      throw new CatalogException("Failed to list catalog.", e);

Review Comment:
   listCatalogs() catches a generic Exception. This can unintentionally swallow 
InterruptedException and clear the interrupt flag, and it also makes it harder 
to reason about which failures are expected here. Prefer catching 
CatalogException (and/or RuntimeException) explicitly; if InterruptedException 
is possible, re-interrupt the thread before wrapping.
   ```suggestion
       } catch (CatalogException e) {
         throw new CatalogException("Failed to list catalog.", e);
       } catch (RuntimeException e) {
         throw new CatalogException("Failed to list catalog.", e);
   ```



##########
docs/flink-connector/flink-connector.md:
##########
@@ -35,8 +35,20 @@ This capability allows users to perform federation queries, 
accessing data from
 | table.catalog-store.kind                         | string | 
generic_in_memory | The Catalog Store name, it should set to `gravitino`.       
         | Yes      | 0.6.0-incubating |
 | table.catalog-store.gravitino.gravitino.metalake | string | (none)           
 | The metalake name that flink connector used to request to Gravitino. | Yes   
   | 0.6.0-incubating |
 | table.catalog-store.gravitino.gravitino.uri      | string | (none)           
 | The uri of Gravitino server address.                                 | Yes   
   | 0.6.0-incubating |
+| table.catalog-store.gravitino.gravitino.support.session.catalog | boolean | 
false | Whether to enable support for Flink's session catalog in the Gravitino 
catalog store. | No | 1.3.0 |
 | table.catalog-store.gravitino.gravitino.client.  | string | (none)           
 | The configuration key prefix for the Gravitino client config.        | No    
   | 1.0.0            |
 
+When `table.catalog-store.gravitino.gravitino.support.session.catalog` is set 
to `true`, Gravitino uses `GravitinoSessionCatalogStore`, which combines a 
`GravitinoCatalogStore` (backed by the Gravitino server) with an in-memory 
store to support Flink's session catalog. When `false` (the default), only 
`GravitinoCatalogStore` is used.
+

Review Comment:
   The PR description mentions a user-facing config 
`table.catalog-store.gravitino.gravitino.allow.third-party-connector.list`, but 
the docs/code introduce 
`table.catalog-store.gravitino.gravitino.support.session.catalog` instead. 
Please align the documentation (and/or implementation) with the PR description 
so users have a single, correct configuration path.



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.store;
+
+import static 
org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.flink.table.catalog.AbstractCatalogStore;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.util.Preconditions;
+
+/** GravitinoCatalogStore is used to store catalog information to Apache 
Gravitino server. */
+public class GravitinoSessionCatalogStore extends AbstractCatalogStore {
+  private final GenericInMemoryCatalogStore memoryCatalogStore;
+  private final GravitinoCatalogStore gravitinoCatalogStore;
+
+  public GravitinoSessionCatalogStore(
+      GravitinoCatalogStore gravitinoCatalogStore, GenericInMemoryCatalogStore 
memoryCatalogStore) {
+    this.gravitinoCatalogStore =
+        Preconditions.checkNotNull(gravitinoCatalogStore, "CatalogStore cannot 
be null");
+    this.memoryCatalogStore =
+        Preconditions.checkNotNull(memoryCatalogStore, "MemoryCatalogStore 
cannot be null");
+  }
+
+  @Override
+  public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
+      throws CatalogException {
+    String catalogType = 
descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE);
+    if (isBuiltInCatalog(catalogType)) {
+      memoryCatalogStore.storeCatalog(catalogName, descriptor);
+    } else {
+      gravitinoCatalogStore.storeCatalog(catalogName, descriptor);

Review Comment:
   storeCatalog routes Gravitino-managed (built-in) catalog types to the 
in-memory store and routes non-built-in types to Gravitino. This is the 
opposite of the documented/expected behavior (Gravitino-managed catalogs should 
be persisted to the Gravitino server; third-party catalogs should stay 
session-scoped in memory). Please swap the target stores for the built-in vs 
third-party branch (and keep the higher-priority lookup semantics in 
get/remove/contains).
   ```suggestion
         gravitinoCatalogStore.storeCatalog(catalogName, descriptor);
       } else {
         memoryCatalogStore.storeCatalog(catalogName, descriptor);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to