This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 84c9bc9ff1 [#7340] fix(flink): Allows to call the method
`GravitinoCatalogManager.create` repeatedly in Flink SQL Gateway (#9191)
84c9bc9ff1 is described below
commit 84c9bc9ff141b86b135b3ed420d213d57a5d1e05
Author: juiceyang <[email protected]>
AuthorDate: Tue Nov 25 19:00:35 2025 +0800
[#7340] fix(flink): Allows to call the method
`GravitinoCatalogManager.create` repeatedly in Flink SQL Gateway (#9191)
### What changes were proposed in this pull request?
Allow repeated calls to GravitinoCatalogManager.create to support
creating multiple Flink SQL Gateway sessions.
### Why are the changes needed?
GravitinoCatalogManager.create throws error on repeated calls during
Flink SQL Gateway session creation.
```
Caused by: java.lang.IllegalStateException: Should not create duplicate
GravitinoCatalogManager
at
org.apache.gravitino.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:512)
at
org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager.create(GravitinoCatalogManager.java:59)
at
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory.open(GravitinoCatalogStoreFactory.java:66)
at
org.apache.flink.table.gateway.service.context.SessionContext.buildCatalogManager(SessionContext.java:333)
at
org.apache.flink.table.gateway.service.context.SessionContext.initializeSessionState(SessionContext.java:294)
at
org.apache.flink.table.gateway.service.context.SessionContext.create(SessionContext.java:260)
at
org.apache.flink.table.gateway.service.session.SessionManagerImpl.openSession(SessionManagerImpl.java:154)
at
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.openSession(SqlGatewayServiceImpl.java:69)
... 49 more
```
Fix: #7340
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Local test passed: No errors encountered when opening sessions
repeatedly.
Added an integration test case.
---
flink-connector/flink/build.gradle.kts | 1 +
.../connector/catalog/GravitinoCatalogManager.java | 59 ++++++++-
.../test/catalog/GravitinoCatalogManagerIT.java | 140 +++++++++++++++++++++
3 files changed, 196 insertions(+), 4 deletions(-)
diff --git a/flink-connector/flink/build.gradle.kts
b/flink-connector/flink/build.gradle.kts
index ae6f806f47..9e5659fb0c 100644
--- a/flink-connector/flink/build.gradle.kts
+++ b/flink-connector/flink/build.gradle.kts
@@ -103,6 +103,7 @@ dependencies {
testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
testImplementation("org.apache.flink:flink-table-common:$flinkVersion")
testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion")
+ testImplementation("org.apache.flink:flink-sql-gateway:$flinkVersion")
testImplementation("org.apache.paimon:paimon-flink-$flinkMajorVersion:$paimonVersion")
testImplementation(libs.hive2.exec) {
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
index af5214c17f..0735be2519 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.flink.connector.catalog;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Map;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
/** GravitinoCatalogManager is used to retrieve catalogs from Apache Gravitino
server. */
public class GravitinoCatalogManager {
+
private static final Logger LOG =
LoggerFactory.getLogger(GravitinoCatalogManager.class);
private static GravitinoCatalogManager gravitinoCatalogManager;
@@ -38,8 +40,20 @@ public class GravitinoCatalogManager {
private final GravitinoMetalake metalake;
private final GravitinoAdminClient gravitinoClient;
+ private final String gravitinoUri;
+ private final String metalakeName;
+ private final Map<String, String> gravitinoClientConfig;
+
private GravitinoCatalogManager(
String gravitinoUri, String metalakeName, Map<String, String>
gravitinoClientConfig) {
+ Preconditions.checkArgument(
+ !Strings.isNullOrEmpty(gravitinoUri), "Gravitino uri cannot be null or
empty");
+ Preconditions.checkArgument(
+ !Strings.isNullOrEmpty(metalakeName), "MetalakeName cannot be null or
empty");
+ Preconditions.checkNotNull(gravitinoClientConfig, "GravitinoClientConfig
cannot be null");
+ this.gravitinoUri = gravitinoUri;
+ this.metalakeName = metalakeName;
+ this.gravitinoClientConfig = gravitinoClientConfig;
this.gravitinoClient =
GravitinoAdminClient.builder(gravitinoUri).withClientConfig(gravitinoClientConfig).build();
this.metalake = gravitinoClient.loadMetalake(metalakeName);
@@ -56,10 +70,18 @@ public class GravitinoCatalogManager {
*/
public static GravitinoCatalogManager create(
String gravitinoUri, String metalakeName, Map<String, String>
gravitinoClientConfig) {
- Preconditions.checkState(
- gravitinoCatalogManager == null, "Should not create duplicate
GravitinoCatalogManager");
- gravitinoCatalogManager =
- new GravitinoCatalogManager(gravitinoUri, metalakeName,
gravitinoClientConfig);
+ if (gravitinoCatalogManager == null) {
+ gravitinoCatalogManager =
+ new GravitinoCatalogManager(gravitinoUri, metalakeName,
gravitinoClientConfig);
+ } else {
+ Preconditions.checkState(
+ checkEqual(gravitinoUri, metalakeName, gravitinoClientConfig),
+ String.format(
+ "Creating GravitinoCatalogManager with different configuration
is not supported. "
+ + "Current singleton %s. "
+ + "Creating with gravitinoUri=%s, metalakeName=%s,
gravitinoClientConfig=%s",
+ gravitinoCatalogManager, gravitinoUri, metalakeName,
gravitinoClientConfig));
+ }
return gravitinoCatalogManager;
}
@@ -155,4 +177,33 @@ public class GravitinoCatalogManager {
public boolean contains(String catalogName) {
return metalake.catalogExists(catalogName);
}
+
+ @Override
+ public String toString() {
+ return "GravitinoCatalogManager{"
+ + "gravitinoUri='"
+ + gravitinoUri
+ + '\''
+ + ", metalakeName='"
+ + metalakeName
+ + '\''
+ + ", gravitinoClientConfig="
+ + gravitinoClientConfig
+ + '}';
+ }
+
+ /**
+ * Check whether the parameters are the same as the configuration of the
GravitinoCatalogManager
+ * static variable.
+ *
+ * @param gravitinoUri Gravitino server uri
+ * @param metalakeName Metalake name
+ * @param gravitinoClientConfig Gravitino client properties map
+ */
+ private static boolean checkEqual(
+ String gravitinoUri, String metalakeName, Map<String, String>
gravitinoClientConfig) {
+ return gravitinoCatalogManager.gravitinoUri.equals(gravitinoUri)
+ && gravitinoCatalogManager.metalakeName.equals(metalakeName)
+ &&
gravitinoCatalogManager.gravitinoClientConfig.equals(gravitinoClientConfig);
+ }
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/catalog/GravitinoCatalogManagerIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/catalog/GravitinoCatalogManagerIT.java
new file mode 100644
index 0000000000..0383f79c49
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/catalog/GravitinoCatalogManagerIT.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.catalog;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Collections;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.gateway.SqlGateway;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.gravitino.client.GravitinoMetalake;
+import
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.rest.RESTUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GravitinoCatalogManagerIT extends BaseIT {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoCatalogManagerIT.class);
+
+ protected static final String GRAVITINO_METALAKE = "flink";
+
+ protected static GravitinoMetalake metalake;
+
+ protected static TableEnvironment tableEnv;
+
+ protected static SqlGateway sqlGateway;
+
+ private static String gravitinoUri = "http://127.0.0.1:8090";
+
+ private static String sqlGatewayHost = "localhost";
+
+ private static int sqlGatewayPort;
+
+ private static String sqlGatewayRestUri;
+
+ @BeforeAll
+ void startUp() throws Exception {
+ // Start Gravitino server
+ super.startIntegrationTest();
+ initGravitinoEnv();
+ initMetalake();
+ initFlinkEnv();
+ LOG.info("Startup Flink env successfully, Gravitino uri: {}.",
gravitinoUri);
+ }
+
+ @AfterAll
+ void stop() throws Exception {
+ stopFlinkEnv();
+ super.stopIntegrationTest();
+ LOG.info("Stop Flink env successfully.");
+ }
+
+ private void initGravitinoEnv() {
+ // Gravitino server is already started by AbstractIT, just construct
gravitinoUri
+ int gravitinoPort = getGravitinoServerPort();
+ gravitinoUri = String.format("http://127.0.0.1:%d", gravitinoPort);
+ }
+
+ private void initMetalake() {
+ metalake = client.createMetalake(GRAVITINO_METALAKE, "",
Collections.emptyMap());
+ }
+
+ private static void initFlinkEnv() throws Exception {
+ sqlGatewayPort = RESTUtils.findAvailablePort(3000, 4000);
+ sqlGatewayRestUri = String.format("http://%s:%d", sqlGatewayHost,
sqlGatewayPort);
+
+ final Configuration configuration = new Configuration();
+ configuration.setString(
+ "table.catalog-store.kind",
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
+
configuration.setString("table.catalog-store.gravitino.gravitino.metalake",
GRAVITINO_METALAKE);
+ configuration.setString("table.catalog-store.gravitino.gravitino.uri",
gravitinoUri);
+ configuration.setString("sql-gateway.endpoint.rest.address",
sqlGatewayHost);
+ configuration.setInteger("sql-gateway.endpoint.rest.port", sqlGatewayPort);
+ EnvironmentSettings.Builder builder =
+ EnvironmentSettings.newInstance().withConfiguration(configuration);
+ tableEnv = TableEnvironment.create(builder.inBatchMode().build());
+ DefaultContext defaultContext = new DefaultContext(configuration,
Collections.emptyList());
+ sqlGateway =
+ new SqlGateway(defaultContext.getFlinkConfig(),
SessionManager.create(defaultContext));
+ sqlGateway.start();
+ }
+
+ private static void stopFlinkEnv() {
+ if (tableEnv != null) {
+ try {
+ TableEnvironmentImpl env = (TableEnvironmentImpl) tableEnv;
+ env.getCatalogManager().close();
+ sqlGateway.stop();
+ } catch (Exception e) {
+ LOG.error("Close Flink environment failed", e);
+ }
+ }
+ }
+
+ private HttpResponse<String> sendOpenSessionRequest() throws IOException,
InterruptedException {
+ String urlString = String.format("%s/v1/sessions", sqlGatewayRestUri);
+ HttpClient client = HttpClient.newHttpClient();
+ HttpRequest request =
+ HttpRequest.newBuilder()
+ .uri(URI.create(urlString))
+ .POST(HttpRequest.BodyPublishers.noBody())
+ .build();
+ return client.send(request, HttpResponse.BodyHandlers.ofString());
+ }
+
+ @Test
+ public void testCreateGravitinoCatalogManager() throws IOException,
InterruptedException {
+ Assertions.assertEquals(200, this.sendOpenSessionRequest().statusCode());
+ Assertions.assertEquals(200, this.sendOpenSessionRequest().statusCode());
+ }
+}