This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 84c9bc9ff1 [#7340] fix(flink): Allows to call the method 
`GravitinoCatalogManager.create` repeatedly in Flink SQL Gateway (#9191)
84c9bc9ff1 is described below

commit 84c9bc9ff141b86b135b3ed420d213d57a5d1e05
Author: juiceyang <[email protected]>
AuthorDate: Tue Nov 25 19:00:35 2025 +0800

    [#7340] fix(flink): Allows to call the method 
`GravitinoCatalogManager.create` repeatedly in Flink SQL Gateway (#9191)
    
    ### What changes were proposed in this pull request?
    
    Allow repeated calls to GravitinoCatalogManager.create to support
    creating multiple Flink SQL Gateway sessions.
    
    ### Why are the changes needed?
    
    GravitinoCatalogManager.create throws error on repeated calls during
    Flink SQL Gateway session creation.
    
    ```
    Caused by: java.lang.IllegalStateException: Should not create duplicate 
GravitinoCatalogManager
        at 
org.apache.gravitino.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:512)
        at 
org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager.create(GravitinoCatalogManager.java:59)
        at 
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory.open(GravitinoCatalogStoreFactory.java:66)
        at 
org.apache.flink.table.gateway.service.context.SessionContext.buildCatalogManager(SessionContext.java:333)
        at 
org.apache.flink.table.gateway.service.context.SessionContext.initializeSessionState(SessionContext.java:294)
        at 
org.apache.flink.table.gateway.service.context.SessionContext.create(SessionContext.java:260)
        at 
org.apache.flink.table.gateway.service.session.SessionManagerImpl.openSession(SessionManagerImpl.java:154)
        at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.openSession(SqlGatewayServiceImpl.java:69)
        ... 49 more
    ```
    
    Fix: #7340
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Local test passed: No errors encountered when opening sessions
    repeatedly.
    Added an integration test case.
---
 flink-connector/flink/build.gradle.kts             |   1 +
 .../connector/catalog/GravitinoCatalogManager.java |  59 ++++++++-
 .../test/catalog/GravitinoCatalogManagerIT.java    | 140 +++++++++++++++++++++
 3 files changed, 196 insertions(+), 4 deletions(-)

diff --git a/flink-connector/flink/build.gradle.kts 
b/flink-connector/flink/build.gradle.kts
index ae6f806f47..9e5659fb0c 100644
--- a/flink-connector/flink/build.gradle.kts
+++ b/flink-connector/flink/build.gradle.kts
@@ -103,6 +103,7 @@ dependencies {
   
testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
   testImplementation("org.apache.flink:flink-table-common:$flinkVersion")
   testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion")
+  testImplementation("org.apache.flink:flink-sql-gateway:$flinkVersion")
   
testImplementation("org.apache.paimon:paimon-flink-$flinkMajorVersion:$paimonVersion")
 
   testImplementation(libs.hive2.exec) {
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
index af5214c17f..0735be2519 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.flink.connector.catalog;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
 import java.util.Arrays;
 import java.util.Map;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 /** GravitinoCatalogManager is used to retrieve catalogs from Apache Gravitino 
server. */
 public class GravitinoCatalogManager {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoCatalogManager.class);
   private static GravitinoCatalogManager gravitinoCatalogManager;
 
@@ -38,8 +40,20 @@ public class GravitinoCatalogManager {
   private final GravitinoMetalake metalake;
   private final GravitinoAdminClient gravitinoClient;
 
+  private final String gravitinoUri;
+  private final String metalakeName;
+  private final Map<String, String> gravitinoClientConfig;
+
   private GravitinoCatalogManager(
       String gravitinoUri, String metalakeName, Map<String, String> 
gravitinoClientConfig) {
+    Preconditions.checkArgument(
+        !Strings.isNullOrEmpty(gravitinoUri), "Gravitino uri cannot be null or 
empty");
+    Preconditions.checkArgument(
+        !Strings.isNullOrEmpty(metalakeName), "MetalakeName cannot be null or 
empty");
+    Preconditions.checkNotNull(gravitinoClientConfig, "GravitinoClientConfig 
cannot be null");
+    this.gravitinoUri = gravitinoUri;
+    this.metalakeName = metalakeName;
+    this.gravitinoClientConfig = gravitinoClientConfig;
     this.gravitinoClient =
         
GravitinoAdminClient.builder(gravitinoUri).withClientConfig(gravitinoClientConfig).build();
     this.metalake = gravitinoClient.loadMetalake(metalakeName);
@@ -56,10 +70,18 @@ public class GravitinoCatalogManager {
    */
   public static GravitinoCatalogManager create(
       String gravitinoUri, String metalakeName, Map<String, String> 
gravitinoClientConfig) {
-    Preconditions.checkState(
-        gravitinoCatalogManager == null, "Should not create duplicate 
GravitinoCatalogManager");
-    gravitinoCatalogManager =
-        new GravitinoCatalogManager(gravitinoUri, metalakeName, 
gravitinoClientConfig);
+    if (gravitinoCatalogManager == null) {
+      gravitinoCatalogManager =
+          new GravitinoCatalogManager(gravitinoUri, metalakeName, 
gravitinoClientConfig);
+    } else {
+      Preconditions.checkState(
+          checkEqual(gravitinoUri, metalakeName, gravitinoClientConfig),
+          String.format(
+              "Creating GravitinoCatalogManager with different configuration 
is not supported. "
+                  + "Current singleton %s. "
+                  + "Creating with gravitinoUri=%s, metalakeName=%s, 
gravitinoClientConfig=%s",
+              gravitinoCatalogManager, gravitinoUri, metalakeName, 
gravitinoClientConfig));
+    }
     return gravitinoCatalogManager;
   }
 
@@ -155,4 +177,33 @@ public class GravitinoCatalogManager {
   public boolean contains(String catalogName) {
     return metalake.catalogExists(catalogName);
   }
+
+  @Override
+  public String toString() {
+    return "GravitinoCatalogManager{"
+        + "gravitinoUri='"
+        + gravitinoUri
+        + '\''
+        + ", metalakeName='"
+        + metalakeName
+        + '\''
+        + ", gravitinoClientConfig="
+        + gravitinoClientConfig
+        + '}';
+  }
+
+  /**
+   * Check whether the parameters are the same as the configuration of the 
GravitinoCatalogManager
+   * static variable.
+   *
+   * @param gravitinoUri Gravitino server uri
+   * @param metalakeName Metalake name
+   * @param gravitinoClientConfig Gravitino client properties map
+   */
+  private static boolean checkEqual(
+      String gravitinoUri, String metalakeName, Map<String, String> 
gravitinoClientConfig) {
+    return gravitinoCatalogManager.gravitinoUri.equals(gravitinoUri)
+        && gravitinoCatalogManager.metalakeName.equals(metalakeName)
+        && 
gravitinoCatalogManager.gravitinoClientConfig.equals(gravitinoClientConfig);
+  }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/catalog/GravitinoCatalogManagerIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/catalog/GravitinoCatalogManagerIT.java
new file mode 100644
index 0000000000..0383f79c49
--- /dev/null
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/catalog/GravitinoCatalogManagerIT.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.catalog;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Collections;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.gateway.SqlGateway;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.gravitino.client.GravitinoMetalake;
+import 
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.rest.RESTUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GravitinoCatalogManagerIT extends BaseIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoCatalogManagerIT.class);
+
+  protected static final String GRAVITINO_METALAKE = "flink";
+
+  protected static GravitinoMetalake metalake;
+
+  protected static TableEnvironment tableEnv;
+
+  protected static SqlGateway sqlGateway;
+
+  private static String gravitinoUri = "http://127.0.0.1:8090";;
+
+  private static String sqlGatewayHost = "localhost";
+
+  private static int sqlGatewayPort;
+
+  private static String sqlGatewayRestUri;
+
+  @BeforeAll
+  void startUp() throws Exception {
+    // Start Gravitino server
+    super.startIntegrationTest();
+    initGravitinoEnv();
+    initMetalake();
+    initFlinkEnv();
+    LOG.info("Startup Flink env successfully, Gravitino uri: {}.", 
gravitinoUri);
+  }
+
+  @AfterAll
+  void stop() throws Exception {
+    stopFlinkEnv();
+    super.stopIntegrationTest();
+    LOG.info("Stop Flink env successfully.");
+  }
+
+  private void initGravitinoEnv() {
+    // Gravitino server is already started by AbstractIT, just construct 
gravitinoUri
+    int gravitinoPort = getGravitinoServerPort();
+    gravitinoUri = String.format("http://127.0.0.1:%d";, gravitinoPort);
+  }
+
+  private void initMetalake() {
+    metalake = client.createMetalake(GRAVITINO_METALAKE, "", 
Collections.emptyMap());
+  }
+
+  private static void initFlinkEnv() throws Exception {
+    sqlGatewayPort = RESTUtils.findAvailablePort(3000, 4000);
+    sqlGatewayRestUri = String.format("http://%s:%d";, sqlGatewayHost, 
sqlGatewayPort);
+
+    final Configuration configuration = new Configuration();
+    configuration.setString(
+        "table.catalog-store.kind", 
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
+    
configuration.setString("table.catalog-store.gravitino.gravitino.metalake", 
GRAVITINO_METALAKE);
+    configuration.setString("table.catalog-store.gravitino.gravitino.uri", 
gravitinoUri);
+    configuration.setString("sql-gateway.endpoint.rest.address", 
sqlGatewayHost);
+    configuration.setInteger("sql-gateway.endpoint.rest.port", sqlGatewayPort);
+    EnvironmentSettings.Builder builder =
+        EnvironmentSettings.newInstance().withConfiguration(configuration);
+    tableEnv = TableEnvironment.create(builder.inBatchMode().build());
+    DefaultContext defaultContext = new DefaultContext(configuration, 
Collections.emptyList());
+    sqlGateway =
+        new SqlGateway(defaultContext.getFlinkConfig(), 
SessionManager.create(defaultContext));
+    sqlGateway.start();
+  }
+
+  private static void stopFlinkEnv() {
+    if (tableEnv != null) {
+      try {
+        TableEnvironmentImpl env = (TableEnvironmentImpl) tableEnv;
+        env.getCatalogManager().close();
+        sqlGateway.stop();
+      } catch (Exception e) {
+        LOG.error("Close Flink environment failed", e);
+      }
+    }
+  }
+
+  private HttpResponse<String> sendOpenSessionRequest() throws IOException, 
InterruptedException {
+    String urlString = String.format("%s/v1/sessions", sqlGatewayRestUri);
+    HttpClient client = HttpClient.newHttpClient();
+    HttpRequest request =
+        HttpRequest.newBuilder()
+            .uri(URI.create(urlString))
+            .POST(HttpRequest.BodyPublishers.noBody())
+            .build();
+    return client.send(request, HttpResponse.BodyHandlers.ofString());
+  }
+
+  @Test
+  public void testCreateGravitinoCatalogManager() throws IOException, 
InterruptedException {
+    Assertions.assertEquals(200, this.sendOpenSessionRequest().statusCode());
+    Assertions.assertEquals(200, this.sendOpenSessionRequest().statusCode());
+  }
+}

Reply via email to