Copilot commented on code in PR #10560:
URL: https://github.com/apache/gravitino/pull/10560#discussion_r2995010436
##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java:
##########
@@ -40,7 +40,11 @@ private GravitinoCatalogStoreFactoryOptions() {}
.stringType()
.noDefaultValue()
.withDescription("The name of Gravitino metalake");
-
+ public static final ConfigOption<String>
GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG =
+ ConfigOptions.key("gravitino.allow.third-party-connector.list")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("List of allow third-party connector of Gravitino,
separated by commas");
Review Comment:
The new config option description is grammatically incorrect ("List of allow
third-party connector of Gravitino") and a bit unclear. Consider rewording to
something like "Comma-separated list of allowed third-party catalog types
(handled by Flink's in-memory catalog store)" to better match how the option is
used.
```suggestion
.withDescription(
"Comma-separated list of allowed third-party catalog types
(handled by Flink's in-memory catalog store)");
```
##########
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoFlinkConfig.java:
##########
@@ -72,4 +73,48 @@ private Map<String, String>
extractGrivitinoClientConfig(Configuration configura
ReadableConfig options = factoryHelper.getOptions();
return extractClientConfig(options);
}
+
+ @Test
+ void testGravitinoCatalogStoreFactory() {
+ final Configuration configuration = new Configuration();
+ configuration.setString(
+ "table.catalog-store.kind",
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
+
configuration.setString("table.catalog-store.gravitino.gravitino.metalake",
"flink");
+ configuration.setString("table.catalog-store.gravitino.gravitino.uri",
"http://127.0.0.1:8090");
+ configuration.setString(
+
"table.catalog-store.gravitino.gravitino.allow.third-party-connector.list",
+ GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
+
+ CatalogStoreFactory.Context context =
+ TableFactoryUtil.buildCatalogStoreFactoryContext(
+ configuration, this.getClass().getClassLoader());
+ GravitinoCatalogStoreFactory factory = new GravitinoCatalogStoreFactory();
+ Exception e =
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
factory.open(context));
+ Assertions.assertTrue(
+ e.getMessage()
+ .contains(
+ "The allowed third party connectors [gravitino-iceberg] should
not contain Gravitino connectors"));
+ }
+
+ @Test
+ void testGravitinoCatalogStoreFactoryWithAllowedType() {
+ final Configuration configuration = new Configuration();
+ configuration.setString(
+ "table.catalog-store.kind",
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
+
configuration.setString("table.catalog-store.gravitino.gravitino.metalake",
"flink");
+ configuration.setString("table.catalog-store.gravitino.gravitino.uri",
"http://127.0.0.1:8090");
+ configuration.setString(
+
"table.catalog-store.gravitino.gravitino.allow.third-party-connector.list",
"jdbc");
+
+ CatalogStoreFactory.Context context =
+ TableFactoryUtil.buildCatalogStoreFactoryContext(
+ configuration, this.getClass().getClassLoader());
+ GravitinoCatalogStoreFactory factory = new GravitinoCatalogStoreFactory();
+
+ // It will throw exception because the Gravitino server is not running.
+ // But it should pass the validation.
+ Exception e = Assertions.assertThrows(Exception.class, () ->
factory.open(context));
+ Assertions.assertFalse(e.getMessage().contains("The allowed third party
connectors"));
Review Comment:
This test relies on the Gravitino server *not* running at `127.0.0.1:8090`
and asserts a generic `Exception`. That makes the unit test potentially
flaky/non-deterministic (it could pass/fail depending on the environment).
Prefer isolating validation from connectivity (e.g., mock
`GravitinoCatalogManager.create(...)`, or force a deterministic failure and
assert its type/message while still proving the third-party list validation
passed).
##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java:
##########
@@ -36,6 +41,16 @@ private FactoryUtils() {}
private static final Logger LOG =
LoggerFactory.getLogger(FactoryUtils.class);
+ /** The list of Gravitino catalog factory identifiers. */
+ public static final ImmutableList<String> gravitinoFactoryList =
+ ImmutableList.<String>builder()
+ .add(GravitinoHiveCatalogFactoryOptions.IDENTIFIER)
+ .add(GravitinoIcebergCatalogFactoryOptions.IDENTIFIER)
+ .add(GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER)
+ .add(GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER)
+ .add(GravitinoPaimonCatalogFactoryOptions.IDENTIFIER)
+ .build();
Review Comment:
`gravitinoFactoryList` is declared as a `public static final` constant but
uses lowerCamelCase. Throughout this module, constants use UPPER_SNAKE_CASE
(e.g., `GRAVITINO_URI`, `IDENTIFIER`). Rename it to a constant-style name (and
consider making it `private` if only used internally) to match the established
convention.
##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java:
##########
@@ -45,26 +47,38 @@
/** GravitinoCatalogStore is used to store catalog information to Apache
Gravitino server. */
public class GravitinoCatalogStore extends AbstractCatalogStore {
private static final Logger LOG =
LoggerFactory.getLogger(GravitinoCatalogStore.class);
+ private final GenericInMemoryCatalogStore memoryCatalogStore;
private final GravitinoCatalogManager gravitinoCatalogManager;
+ private List<String> allowThirdPartyConnectors;
- public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) {
+ public GravitinoCatalogStore(
+ GravitinoCatalogManager catalogManager,
+ GenericInMemoryCatalogStore memoryCatalogStore,
+ List<String> allowThirdPartyConnectors) {
this.gravitinoCatalogManager = catalogManager;
+ this.memoryCatalogStore = memoryCatalogStore;
+ this.allowThirdPartyConnectors = allowThirdPartyConnectors;
}
Review Comment:
The constructor accepts `memoryCatalogStore` and `allowThirdPartyConnectors`
without null checks. If either is accidentally passed as null,
`storeCatalog`/`contains` will throw NPE. Add `Preconditions.checkNotNull(...)`
(and ideally defensively copy the connector list into an immutable collection)
to make this class safer to use.
##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java:
##########
@@ -113,16 +139,20 @@ public Optional<CatalogDescriptor> getCatalog(String
catalogName) throws Catalog
@Override
public Set<String> listCatalogs() throws CatalogException {
+ Set<String> catalogs = new HashSet<>();
+ catalogs.addAll(memoryCatalogStore.listCatalogs());
try {
- return gravitinoCatalogManager.listCatalogs();
+ catalogs.addAll(gravitinoCatalogManager.listCatalogs());
} catch (Exception e) {
throw new CatalogException("Failed to list catalog.", e);
}
+ return catalogs;
}
@Override
public boolean contains(String catalogName) throws CatalogException {
- return gravitinoCatalogManager.contains(catalogName);
+ return memoryCatalogStore.contains(catalogName)
+ || gravitinoCatalogManager.contains(catalogName);
}
Review Comment:
New behavior routes some catalogs to `memoryCatalogStore`, and the logic in
`removeCatalog(...)`, `getCatalog(...)`, and `listCatalogs()` now has
branching/merging semantics that aren’t covered by tests (e.g., removing a
memory-stored catalog, `getCatalog` returning the memory descriptor, union
behavior in `listCatalogs`). Add unit tests for these code paths to ensure
third-party catalogs behave consistently.
##########
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStore.java:
##########
@@ -24,19 +24,29 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
import org.junit.Before;
import org.junit.Test;
public class TestGravitinoCatalogStore {
private GravitinoCatalogManager gravitinoCatalogMockManager;
+ private GenericInMemoryCatalogStore memoryCatalogStore;
private GravitinoCatalogStore gravitinoCatalogStore;
@Before
public void setupCatalogStore() {
gravitinoCatalogMockManager = mock(GravitinoCatalogManager.class);
- gravitinoCatalogStore = new
GravitinoCatalogStore(gravitinoCatalogMockManager);
+ memoryCatalogStore = new GenericInMemoryCatalogStore();
+ memoryCatalogStore.open();
+ gravitinoCatalogStore =
+ new GravitinoCatalogStore(
+ gravitinoCatalogMockManager, memoryCatalogStore,
Arrays.asList("filesystem", "jdbc"));
}
Review Comment:
The test opens `memoryCatalogStore` in `@Before` but never closes it. Add an
`@After` method to close `memoryCatalogStore` to avoid leaking resources across
tests (and to match the factory’s lifecycle).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]