This is an automated email from the ASF dual-hosted git repository.
diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new f0571fe875 [#9492] improvement(trino-connector): Add support skip
catalog (#9493)
f0571fe875 is described below
commit f0571fe875c8e560b5c66950f668f3ad7c8c6d50
Author: qbhan <[email protected]>
AuthorDate: Wed Dec 24 09:47:31 2025 +0800
[#9492] improvement(trino-connector): Add support skip catalog (#9493)
### What changes were proposed in this pull request?
Add config `gravitino.trino.skip-catalog-patterns` which defines a
comma-separated list of catalog name regex patterns that should be
excluded from loading.
### Why are the changes needed?
Fix: #9492
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
local tests
Co-authored-by: Yuhui <[email protected]>
---
docs/trino-connector/configuration.md | 1 +
.../gravitino/trino/connector/GravitinoConfig.java | 48 ++++++++
.../connector/catalog/CatalogConnectorManager.java | 36 ++++--
.../system/table/GravitinoSystemTableCatalog.java | 3 +
.../trino/connector/TestGravitinoConfig.java | 51 ++++++++
.../TestGravitinoConnectorWithSkipCatalog.java | 128 +++++++++++++++++++++
6 files changed, 258 insertions(+), 9 deletions(-)
diff --git a/docs/trino-connector/configuration.md
b/docs/trino-connector/configuration.md
index 19a5051033..6d407a84e9 100644
--- a/docs/trino-connector/configuration.md
+++ b/docs/trino-connector/configuration.md
@@ -15,6 +15,7 @@ license: "This software is licensed under the Apache License
version 2."
| gravitino.metadata.refresh-interval-seconds | integer | 10
| The `gravitino.metadata.refresh-interval-seconds` defines the interval in
seconds to refresh metadata from Gravitino server, the default value is 10
seconds.
| No | 0.9.0 |
| gravitino.trino.skip-version-validation | boolean | false
| The `gravitino.trino.skip-version-validation` defines whether skip Trino
version validation or not. Note that Gravitino only supports Trino which
version between 435 and 439, other versions of Trino have not undergone
thorough testing, so there may be compatiablity problem if true.
| No | 1.0.0 |
| gravitino.client. | string | (none)
| The configuration key prefix for the Gravitino client config.
| No
| 1.0.0 |
+| gravitino.trino.skip-catalog-patterns | string | (none)
| The `gravitino.trino.skip-catalog-patterns` defines a comma-separated list
of catalog name regex patterns that should be excluded from loading. For
example, `test_.*, .*_tmp` excludes all catalogs starting with `test_` or
ending with `_tmp`.
| No | 1.2.0 |
To configure the Gravitino client, use properties prefixed with
`gravitino.client.`. These properties will directly passed to the Gravitino
client.
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
index 4361dfb586..70d69ead10 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
@@ -18,6 +18,9 @@
*/
package org.apache.gravitino.trino.connector;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+
+import com.google.common.base.Splitter;
import io.trino.spi.TrinoException;
import java.io.FileInputStream;
import java.io.IOException;
@@ -27,6 +30,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -66,6 +70,7 @@ public class GravitinoConfig {
private static final Map<String, ConfigEntry> CONFIG_DEFINITIONS = new
HashMap<>();
private final Map<String, String> config;
+ private final List<Pattern> skipCatalogPatternList;
// Gravitino config entity
private static final ConfigEntry GRAVITINO_URI =
@@ -132,6 +137,13 @@ public class GravitinoConfig {
private static final ConfigEntry GRAVITINO_CLIENT_CONFIG_PREFIX =
new ConfigEntry("gravitino.client.", "The config prefix for Grivitino
client", "", false);
+ private static final ConfigEntry GRAVITINO_TRINO_SKIP_CATALOG_PATTERNS =
+ new ConfigEntry(
+ "gravitino.trino.skip-catalog-patterns",
+ "The property to specify a comma-separated list of catalog name
regex patterns that should be excluded from loading.",
+ "",
+ false);
+
/**
* Constructs a new GravitinoConfig with the specified configuration.
*
@@ -153,6 +165,14 @@ public class GravitinoConfig {
GravitinoErrorCode.GRAVITINO_MISSING_CONFIG,
"Incomplete Dynamic catalog connector config");
}
+ try {
+ skipCatalogPatternList = initSkipCatalogPatterns();
+ } catch (Exception e) {
+ throw new TrinoException(
+ NOT_SUPPORTED,
+ "Config `gravitino.trino.skip-catalog-patterns` is invalid because
it contains an illegal regular expression",
+ e);
+ }
}
/**
@@ -329,6 +349,34 @@ public class GravitinoConfig {
GRAVITINO_TRINO_SKIP_VERSION_VALIDATION.defaultValue));
}
+ /**
+ * Init a comma-separated list of catalog name regex patterns that should be
excluded from loading
+ *
+ * @return a list of catalog name regex patterns
+ */
+ private List<Pattern> initSkipCatalogPatterns() {
+ String skipCatalogConfig =
+ config.getOrDefault(
+ GRAVITINO_TRINO_SKIP_CATALOG_PATTERNS.key,
+ GRAVITINO_TRINO_SKIP_CATALOG_PATTERNS.defaultValue);
+ return Splitter.on(',')
+ .trimResults()
+ .omitEmptyStrings()
+ .splitToStream(skipCatalogConfig)
+ .map(Pattern::compile)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Retrieves a comma-separated list of catalog name regex patterns that
should be excluded from
+ * loading
+ *
+ * @return a list of catalog name regex patterns
+ */
+ public List<Pattern> getSkipCatalogPatterns() {
+ return skipCatalogPatternList;
+ }
+
static class ConfigEntry {
final String key;
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
index ec0826d94e..e1b8a7e54d 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.gravitino.Catalog;
@@ -193,23 +194,23 @@ public class CatalogConnectorManager {
}
private void loadCatalogs(GravitinoMetalake metalake) {
- String[] catalogNames;
+ List<String> catalogNames;
try {
- catalogNames = metalake.listCatalogs();
+ catalogNames =
+ Arrays.stream(metalake.listCatalogs())
+ .filter(id -> !skipCatalog(getTrinoCatalogName(metalake.name(),
id)))
+ .collect(Collectors.toList());
} catch (Exception e) {
LOG.error("Failed to list catalogs in metalake {}.", metalake.name(), e);
return;
}
- LOG.debug(
- "Load metalake {}'s catalogs. catalogs: {}.",
- metalake.name(),
- Arrays.toString(catalogNames));
+ LOG.debug("Load metalake {}'s catalogs. catalogs: {}.", metalake.name(),
catalogNames);
// Delete those catalogs that have been deleted in Gravitino server
Set<String> catalogNameStrings =
- Arrays.stream(catalogNames)
- .map(id -> config.singleMetalakeMode() ? id :
getTrinoCatalogName(metalake.name(), id))
+ catalogNames.stream()
+ .map(id -> getTrinoCatalogName(metalake.name(), id))
.collect(Collectors.toSet());
for (Map.Entry<String, CatalogConnectorContext> entry :
catalogConnectors.entrySet()) {
@@ -226,7 +227,7 @@ public class CatalogConnectorManager {
}
// Load new catalogs belows to the metalake.
- Arrays.stream(catalogNames)
+ catalogNames.stream()
.forEach(
(String catalogName) -> {
try {
@@ -415,4 +416,21 @@ public class CatalogConnectorManager {
public GravitinoMetalake getMetalake(String metalake) {
return metalakes.computeIfAbsent(metalake, this::retrieveMetalake);
}
+
+ /**
+ * Whether skip loading catalog or not
+ *
+ * @param catalogName catalog name
+ * @return whether skip loading catalog or not
+ */
+ public boolean skipCatalog(String catalogName) {
+ for (Pattern pattern : config.getSkipCatalogPatterns()) {
+ if (pattern.matcher(catalogName).matches()) {
+ LOG.debug(
+ "Skip catalog {} with config
`gravitino.trino.skip-catalog-patterns`.", catalogName);
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/table/GravitinoSystemTableCatalog.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/table/GravitinoSystemTableCatalog.java
index 19aedef5fe..aac74a4762 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/table/GravitinoSystemTableCatalog.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/table/GravitinoSystemTableCatalog.java
@@ -76,6 +76,9 @@ public class GravitinoSystemTableCatalog extends
GravitinoSystemTable {
GravitinoMetalake metalake =
catalogConnectorManager.getMetalake(metalakeName);
Catalog[] catalogs = metalake.listCatalogsInfo();
for (Catalog catalog : catalogs) {
+ if (catalogConnectorManager.skipCatalog(catalog.name())) {
+ continue;
+ }
if (catalog.type() == Catalog.Type.RELATIONAL) {
gravitinoCatalogs.add(new GravitinoCatalog(metalakeName,
catalog));
}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
index 473fe1f24d..e4a8c9e5fe 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
@@ -20,11 +20,14 @@ package org.apache.gravitino.trino.connector;
import static
org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.TrinoException;
import java.util.Map;
+import java.util.regex.Pattern;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.jupiter.api.Test;
@@ -112,4 +115,52 @@ public class TestGravitinoConfig {
assertEquals(clientConfig.get("gravitino.client.socketTimeoutMs"),
"10000");
assertEquals(clientConfig.get("gravitino.client.connectionTimeoutMs"),
"20000");
}
+
+ @Test
+ public void testGravitinoConfigWithSkipCatalogPatterns() {
+ String gravitinoUrl = "http://127.0.0.1:8000";
+ String metalake = "user_001";
+ ImmutableMap<String, String> configMap =
+ ImmutableMap.of("gravitino.uri", gravitinoUrl, "gravitino.metalake",
metalake);
+ GravitinoConfig config = new GravitinoConfig(configMap);
+
+ assertFalse(skipCatalog("test_catalog", config));
+
+ ImmutableMap<String, String> configMapWithSkipCatalogList =
+ ImmutableMap.of(
+ "gravitino.uri",
+ gravitinoUrl,
+ "gravitino.metalake",
+ metalake,
+ "gravitino.trino.skip-catalog-patterns",
+ "test_.*, test1\\.c.*");
+ GravitinoConfig configWithSkipCatalogPatterns =
+ new GravitinoConfig(configMapWithSkipCatalogList);
+ assertTrue(skipCatalog("test_catalog", configWithSkipCatalogPatterns));
+ assertTrue(skipCatalog("test1.catalog", configWithSkipCatalogPatterns));
+ assertFalse(skipCatalog("test1_catalog", configWithSkipCatalogPatterns));
+ assertFalse(skipCatalog("test2_catalog", configWithSkipCatalogPatterns));
+
+ ImmutableMap<String, String> configMapWithInvalidSkipCatalogList =
+ ImmutableMap.of(
+ "gravitino.uri",
+ gravitinoUrl,
+ "gravitino.metalake",
+ metalake,
+ "gravitino.trino.skip-catalog-patterns",
+ "test_.*, (abc");
+ assertThrowsExactly(
+ TrinoException.class,
+ () -> new GravitinoConfig(configMapWithInvalidSkipCatalogList),
+ "Config `gravitino.trino.skip-catalog-patterns` is invalid because it
contains an illegal regular expression");
+ }
+
+ private static boolean skipCatalog(String catalogName, GravitinoConfig
config) {
+ for (Pattern pattern : config.getSkipCatalogPatterns()) {
+ if (pattern.matcher(catalogName).matches()) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java
new file mode 100644
index 0000000000..c3ad8f63c4
--- /dev/null
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import static io.trino.testing.TestingSession.testSessionBuilder;
+import static org.apache.gravitino.Catalog.Type.RELATIONAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.trino.Session;
+import io.trino.plugin.memory.MemoryPlugin;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.DistributedQueryRunner;
+import io.trino.testing.MaterializedResult;
+import io.trino.testing.QueryRunner;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoConnectorWithSkipCatalog extends
AbstractTestQueryFramework {
+
+ GravitinoMockServer server;
+
+ @Override
+ protected QueryRunner createQueryRunner() throws Exception {
+ GravitinoAdminClient gravitinoClient = initGravitinoMockServer();
+ Session session = testSessionBuilder().setCatalog("gravitino").build();
+
+ try {
+ DistributedQueryRunner queryRunner =
+ DistributedQueryRunner.builder(session).setNodeCount(1).build();
+
+ TestGravitinoPlugin gravitinoPlugin = new
TestGravitinoPlugin(gravitinoClient);
+ queryRunner.installPlugin(gravitinoPlugin);
+
+ {
+ // create a gravitino connector with single metalake
+ HashMap<String, String> properties = new HashMap<>();
+ properties.put("gravitino.metalake", "test1");
+ properties.put("gravitino.uri", "http://127.0.0.1:8090");
+ properties.put("gravitino.trino.skip-catalog-patterns", "a.*, b1");
+ properties.put(
+ "catalog.config-dir",
queryRunner.getCoordinator().getBaseDataDir().toString());
+ properties.put("discovery.uri",
queryRunner.getCoordinator().getBaseUrl().toString());
+ queryRunner.createCatalog("gravitino", "gravitino", properties);
+ }
+
+
GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader())
+ .installPlugin("memory", new MemoryPlugin());
+ CatalogConnectorManager catalogConnectorManager =
+ gravitinoPlugin.getCatalogConnectorManager();
+ server.setCatalogConnectorManager(catalogConnectorManager);
+
+ // Wait for the catalog to be created. Wait for at least 30 seconds.
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(() -> !catalogConnectorManager.getCatalogs().isEmpty());
+ return queryRunner;
+
+ } catch (Exception e) {
+ throw new RuntimeException("Create query runner failed", e);
+ }
+ }
+
+ @Test
+ public void testShowCatalogsFilteredBySkipPatterns() throws Exception {
+ MaterializedResult expectedResult = computeActual("show catalogs");
+ assertEquals(expectedResult.getMaterializedRows().size(), 3);
+ List<String> catalogs =
+ expectedResult.getMaterializedRows().stream()
+ .map(row -> (String) row.getField(0))
+ .collect(Collectors.toList());
+ assertFalse(catalogs.contains("a1"));
+ assertFalse(catalogs.contains("b1"));
+ assertTrue(catalogs.contains("b2"));
+ }
+
+ @Test
+ public void testSystemTableQueryFilteredBySkipPatterns() throws Exception {
+ MaterializedResult expectedResult = computeActual("select * from
gravitino.system.catalog");
+ assertEquals(expectedResult.getMaterializedRows().size(), 1);
+ List<String> catalogs =
+ expectedResult.getMaterializedRows().stream()
+ .map(row -> (String) row.getField(0))
+ .collect(Collectors.toList());
+ assertFalse(catalogs.contains("a1"));
+ assertFalse(catalogs.contains("b1"));
+ assertTrue(catalogs.contains("b2"));
+ }
+
+ private GravitinoAdminClient initGravitinoMockServer() {
+ GravitinoMockServer gravitinoMockServer = new GravitinoMockServer();
+ server = closeAfterClass(gravitinoMockServer);
+ GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
+
+ gravitinoClient.createMetalake("test1", "", Collections.emptyMap());
+ GravitinoMetalake metalake = gravitinoClient.loadMetalake("test1");
+ metalake.createCatalog("a1", RELATIONAL, "", "", Collections.emptyMap());
+ metalake.createCatalog("b1", RELATIONAL, "", "", Collections.emptyMap());
+ metalake.createCatalog("b2", RELATIONAL, "", "", Collections.emptyMap());
+ return gravitinoClient;
+ }
+}