FANNG1 commented on code in PR #10560:
URL: https://github.com/apache/gravitino/pull/10560#discussion_r3015666888


##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.store;
+
+import static 
org.apache.gravitino.flink.connector.utils.FactoryUtils.isGravitinoManagedCatalogType;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.flink.table.catalog.AbstractCatalogStore;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A catalog store that combines a session-scoped in-memory {@link 
GenericInMemoryCatalogStore} with
+ * a persistent {@link GravitinoCatalogStore}.
+ *
+ * <p>Routing is based on the catalog type declared in the {@link 
CatalogDescriptor}:
+ *
+ * <ul>
+ *   <li><b>Gravitino-managed catalog types</b> (e.g. {@code gravitino-hive}, 
{@code
+ *       gravitino-iceberg}) are persisted to the Gravitino-backed store and 
are visible across all
+ *       Flink sessions and Gravitino clients that share the same metalake.
+ *   <li><b>All other catalog types</b> (e.g. {@code generic_in_memory}, 
third-party connectors) are
+ *       stored in the session-scoped in-memory store only and are not 
persisted to Gravitino.
+ * </ul>
+ *
+ * <p>When retrieving, listing, or removing catalogs, entries in the in-memory 
store take precedence
+ * over entries in the Gravitino-backed store.
+ *
+ * <p>This store is intended to be used per Flink session, keeping transient 
catalogs in memory
+ * while delegating persistent catalogs to Apache Gravitino.
+ */
+public class GravitinoSessionCatalogStore extends AbstractCatalogStore {
+  private final GenericInMemoryCatalogStore memoryCatalogStore;
+  private final GravitinoCatalogStore gravitinoCatalogStore;
+
+  public GravitinoSessionCatalogStore(
+      GravitinoCatalogStore gravitinoCatalogStore, GenericInMemoryCatalogStore 
memoryCatalogStore) {
+    Preconditions.checkArgument(gravitinoCatalogStore != null, "CatalogStore 
cannot be null");
+    Preconditions.checkArgument(memoryCatalogStore != null, 
"MemoryCatalogStore cannot be null");
+    this.gravitinoCatalogStore = gravitinoCatalogStore;
+    this.memoryCatalogStore = memoryCatalogStore;
+  }
+
+  @Override
+  public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
+      throws CatalogException {
+    String catalogType = 
descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE);
+    if (catalogType == null) {
+      throw new CatalogException(
+          String.format(
+              "Cannot store catalog '%s': '%s' is not set in the catalog 
descriptor. "
+                  + "Please specify a catalog type.",
+              catalogName, CommonCatalogOptions.CATALOG_TYPE.key()));
+    }
+    if (isGravitinoManagedCatalogType(catalogType)) {
+      gravitinoCatalogStore.storeCatalog(catalogName, descriptor);
+    } else {
+      memoryCatalogStore.storeCatalog(catalogName, descriptor);
+    }
+  }
+  /**
+   * Removes the specified catalog.
+   *

Review Comment:
   Missing blank line between `storeCatalog` and the Javadoc for 
`removeCatalog`. Please add a blank line to separate the two methods, otherwise 
`./gradlew spotlessApply` will flag this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to