twalthr commented on code in PR #25211:
URL: https://github.com/apache/flink/pull/25211#discussion_r1799549652
##########
flink-python/pyflink/table/catalog.py:
##########
@@ -1028,6 +1122,69 @@ def get_function_language(self):
return self._j_catalog_function.getFunctionLanguage()
+class CatalogModel(object):
+ """
+ Interface for a model in a catalog.
+ """
+
+ def __init__(self, j_catalog_model):
+ self._j_catalog_model = j_catalog_model
+
+ @staticmethod
+ def create_model(
+ input_schema: Schema,
+ output_schema: Schema,
+ properties: Dict[str, str] = {},
Review Comment:
`options`
##########
flink-python/pyflink/table/tests/test_catalog.py:
##########
@@ -103,6 +116,22 @@ def get_streaming_table_properties():
def create_partition_keys():
return ["second", "third"]
+ @staticmethod
+ def create_model():
+ return CatalogModel.create_model(
+ CatalogTestBase.create_model_schema(),
+ CatalogTestBase.create_model_schema(),
+ properties={},
+ comment="some comment")
+
+ @staticmethod
+ def create_another_model():
+ return CatalogModel.create_model(
+ CatalogTestBase.create_model_schema(),
+ CatalogTestBase.create_model_schema(),
+ properties={"key": "value"},
Review Comment:
```suggestion
options={"key": "value"},
```
##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java:
##########
@@ -52,6 +60,197 @@ static void init() {
catalog.open();
}
+ // TODO (FLINK-35020) : remove after implementing dropModel in catalog
+ @AfterEach
+ void cleanup() throws Exception {
+ if (catalog.modelExists(modelPath1)) {
+ ((GenericInMemoryCatalog) catalog).dropModel(modelPath1, true);
+ }
+ if (catalog.modelExists(modelPath2)) {
+ ((GenericInMemoryCatalog) catalog).dropModel(modelPath2, true);
+ }
+ super.cleanup();
+ }
+
+ // These are put here instead of CatalogTest class since model operations
are not implemented
Review Comment:
rather introduce a `supportsModels(): boolean` method in CatalogTest and
move those tests to the testbase. Set this flag to false and ignore them for
Hive catalog.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedModel.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class contains information about a model and its relationship with a
{@link Catalog}, if
+ * any.
+ *
+ * <p>There can be 2 kinds of {@link ContextResolvedModel}:
+ *
+ * <ul>
+ * <li>A permanent model: a model which is stored in a {@link Catalog} and
has an associated
+ * unique {@link ObjectIdentifier}.
+ * <li>A temporary model: a model which is stored in the {@link
CatalogManager}, has an associated
+ * unique {@link ObjectIdentifier} and is flagged as temporary.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent model is {@link
Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of models, an
instance of this object
+ * represents the relationship between the specific {@link
ResolvedCatalogModel} instance and the
+ * specific {@link Catalog}/{@link CatalogManager} instances. For example, the
same {@link
+ * ResolvedCatalogModel} can be temporary for one catalog, but permanent for
another one.
+ */
+@Internal
+public final class ContextResolvedModel {
+
+ private final ObjectIdentifier objectIdentifier;
+ private final @Nullable Catalog catalog;
+ private final ResolvedCatalogModel resolvedModel;
+
+ public static ContextResolvedModel permanent(
+ ObjectIdentifier identifier, Catalog catalog, ResolvedCatalogModel
resolvedModel) {
+ return new ContextResolvedModel(
+ identifier, Preconditions.checkNotNull(catalog),
resolvedModel);
+ }
+
+ public static ContextResolvedModel temporary(
+ ObjectIdentifier identifier, ResolvedCatalogModel resolvedModel) {
+ return new ContextResolvedModel(identifier, null, resolvedModel);
+ }
+
+ private ContextResolvedModel(
+ ObjectIdentifier objectIdentifier,
+ @Nullable Catalog catalog,
+ ResolvedCatalogModel resolvedModel) {
+ this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier);
+ this.catalog = catalog;
+ this.resolvedModel = Preconditions.checkNotNull(resolvedModel);
+ }
+
+ /** @return true if the table is temporary. An anonymous table is always
temporary. */
Review Comment:
```suggestion
/** @return true if the model is temporary. */
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1352,293 @@ private void dropTableInternal(
}
}
+ /**
+ * Retrieves a fully qualified model. If the path is not yet fully
qualified use {@link
+ * #qualifyIdentifier(UnresolvedIdentifier)} first.
+ *
+ * @param objectIdentifier full path of the model to retrieve
+ * @return model that the path points to.
+ */
+ public Optional<ContextResolvedModel> getModel(ObjectIdentifier
objectIdentifier) {
+ CatalogModel temporaryModel = temporaryModels.get(objectIdentifier);
+ if (temporaryModel != null) {
+ final ResolvedCatalogModel resolvedModel =
resolveCatalogModel(temporaryModel);
+ return
Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel));
+ }
+ Optional<Catalog> catalogOptional =
getCatalog(objectIdentifier.getCatalogName());
+ ObjectPath objectPath = objectIdentifier.toObjectPath();
+ if (catalogOptional.isPresent()) {
+ Catalog currentCatalog = catalogOptional.get();
+ try {
+ final CatalogModel model = currentCatalog.getModel(objectPath);
+ if (model != null) {
+ final ResolvedCatalogModel resolvedModel =
resolveCatalogModel(model);
+ return Optional.of(
+ ContextResolvedModel.permanent(
+ objectIdentifier, currentCatalog,
resolvedModel));
+ }
+ } catch (ModelNotExistException e) {
+ // Ignore.
+ } catch (UnsupportedOperationException e) {
+ // Ignore for catalogs that don't support models.
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the
model is not available
+ * in any of the catalogs.
+ */
+ public ContextResolvedModel getModelOrError(ObjectIdentifier
objectIdentifier) {
+ return getModel(objectIdentifier)
+ .orElseThrow(
+ () ->
+ new TableException(
+ String.format(
+ "Cannot find model '%s' in any
of the catalogs %s.",
+ objectIdentifier,
listCatalogs())));
+ }
+
+ /**
+ * Return whether the model with a fully qualified table path is temporary
or not.
+ *
+ * @param objectIdentifier full path of the table
+ * @return the model is temporary or not.
+ */
+ public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) {
+ return temporaryModels.containsKey(objectIdentifier);
+ }
+
+ /**
+ * Returns an array of names of all models registered in the namespace of
the current catalog
+ * and database.
+ *
+ * @return names of all registered models
+ */
+ public Set<String> listModels() {
+ return listModels(getCurrentCatalog(), getCurrentDatabase());
+ }
+
+ /**
+ * Returns an array of names of all models registered in the namespace of
the given catalog and
+ * database.
+ *
+ * @return names of all registered models
+ */
+ public Set<String> listModels(String catalogName, String databaseName) {
+ Catalog catalog = getCatalogOrThrowException(catalogName);
+ if (catalog == null) {
+ throw new ValidationException(String.format("Catalog %s does not
exist", catalogName));
+ }
+ try {
+ return new HashSet<>(catalog.listModels(databaseName));
+ } catch (DatabaseNotExistException e) {
+ throw new ValidationException(
+ String.format("Database %s does not exist", databaseName),
e);
+ }
+ }
+
+ /**
+ * Creates a model in a given fully qualified path.
+ *
+ * @param model The resolved model to put in the given path.
+ * @param objectIdentifier The fully qualified path where to put the model.
+ * @param ignoreIfExists If false exception will be thrown if a model
exists in the given path.
+ */
+ public void createModel(
+ CatalogModel model, ObjectIdentifier objectIdentifier, boolean
ignoreIfExists) {
+ execute(
+ (catalog, path) -> {
+ final ResolvedCatalogModel resolvedModel =
resolveCatalogModel(model);
+ catalog.createModel(path, resolvedModel, ignoreIfExists);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ CreateModelEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedModel,
+ ignoreIfExists,
+ false)));
+ },
+ objectIdentifier,
+ false,
+ "CreateModel");
+ }
+
+ /**
+ * Creates a temporary model in a given fully qualified path.
+ *
+ * @param model The resolved model to put in the given path.
+ * @param objectIdentifier The fully qualified path where to put the model.
+ * @param ignoreIfExists if false exception will be thrown if a model
exists in the given path.
+ */
+ public void createTemporaryModel(
+ CatalogModel model, ObjectIdentifier objectIdentifier, boolean
ignoreIfExists) {
+ Optional<TemporaryOperationListener> listener =
+ getTemporaryOperationListener(objectIdentifier);
+ temporaryModels.compute(
+ objectIdentifier,
+ (k, v) -> {
+ if (v != null) {
+ if (!ignoreIfExists) {
+ throw new ValidationException(
+ String.format(
+ "Temporary model '%s' already
exists",
+ objectIdentifier));
+ }
+ return v;
+ } else {
+ ResolvedCatalogModel resolvedModel =
resolveCatalogModel(model);
+ Catalog catalog =
+
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+ if (listener.isPresent()) {
+ return listener.get()
+ .onCreateTemporaryModel(
+ objectIdentifier.toObjectPath(),
resolvedModel);
+ }
+ catalogModificationListeners.forEach(
+ l ->
+ l.onEvent(
+ CreateModelEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedModel,
+ ignoreIfExists,
+ true)));
+ return resolvedModel;
+ }
+ });
+ }
+
+ /**
+ * Alters a model in a given fully qualified path.
+ *
+ * @param modelChange The model containing only changes
+ * @param objectIdentifier The fully qualified path where to alter the
model.
+ * @param ignoreIfNotExists If false exception will be thrown if the model
to be altered does
+ * not exist.
+ */
+ public void alterModel(
+ CatalogModel modelChange,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfNotExists) {
+ execute(
+ (catalog, path) -> {
+ ResolvedCatalogModel resolvedModel =
resolveCatalogModel(modelChange);
+ catalog.alterModel(path, resolvedModel, ignoreIfNotExists);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ AlterModelEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedModel,
+ ignoreIfNotExists)));
+ },
+ objectIdentifier,
+ ignoreIfNotExists,
+ "AlterModel");
+ }
+
+ /**
+ * Drops a model in a given fully qualified path.
+ *
+ * @param objectIdentifier The fully qualified path of the model to drop.
+ * @param ignoreIfNotExists If false exception will be thrown if the model
to drop does not
+ * exist.
+ */
+ public void dropModel(ObjectIdentifier objectIdentifier, boolean
ignoreIfNotExists) {
+ execute(
+ (catalog, path) -> {
+ Optional<ContextResolvedModel> resultOpt =
getModel(objectIdentifier);
+ if (resultOpt.isPresent()) {
+ ResolvedCatalogModel resolvedModel =
resultOpt.get().getResolvedModel();
+ catalog.dropModel(path, ignoreIfNotExists);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ DropModelEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedModel,
+ ignoreIfNotExists,
+ false)));
+ } else if (!ignoreIfNotExists) {
+ throw new ModelNotExistException(
+ objectIdentifier.getCatalogName(),
objectIdentifier.toObjectPath());
+ }
+ },
+ objectIdentifier,
+ ignoreIfNotExists,
+ "DropModel");
+ }
+
+ /**
+ * Drop a temporary model in a given fully qualified path.
+ *
+ * @param objectIdentifier The fully qualified path of the model to drop.
+ * @param ignoreIfNotExists If false exception will be thrown if the model
to be dropped does
+ * not exist.
+ */
+ public void dropTemporaryModel(ObjectIdentifier objectIdentifier, boolean
ignoreIfNotExists) {
+ CatalogModel model = temporaryModels.get(objectIdentifier);
+ if (model != null) {
+ getTemporaryOperationListener(objectIdentifier)
+ .ifPresent(l ->
l.onDropTemporaryModel(objectIdentifier.toObjectPath()));
+
+ Catalog catalog =
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+ ResolvedCatalogModel resolvedModel = resolveCatalogModel(model);
+ temporaryModels.remove(objectIdentifier);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ DropModelEvent.createEvent(
+ CatalogContext.createContext(
+
objectIdentifier.getCatalogName(), catalog),
+ objectIdentifier,
+ resolvedModel,
+ ignoreIfNotExists,
+ true)));
+ } else if (!ignoreIfNotExists) {
+ throw new ValidationException(
+ String.format(
+ "Temporary model with identifier '%s' does not
exist.",
+ objectIdentifier.asSummaryString()));
+ }
+ }
+
+ public ResolvedCatalogModel resolveCatalogModel(CatalogModel model) {
+ Preconditions.checkNotNull(schemaResolver, "Schema resolver is not
initialized.");
+ if (model instanceof ResolvedCatalogModel) {
+ return (ResolvedCatalogModel) model;
+ }
+ // If input schema / output schema does not exist, get the schema from
select as statement.
+ final ResolvedSchema resolvedInputSchema;
+ if (model.getInputSchema() == null) {
Review Comment:
Could you provide more information about this case? It looks like this is at
the wrong layer. For tables, a catalog object has always a correct output type.
Take a look at CREATE TABLE AS which has similar behavior.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1352,293 @@ private void dropTableInternal(
}
}
+ /**
+ * Retrieves a fully qualified model. If the path is not yet fully
qualified use {@link
+ * #qualifyIdentifier(UnresolvedIdentifier)} first.
+ *
+ * @param objectIdentifier full path of the model to retrieve
+ * @return model that the path points to.
+ */
+ public Optional<ContextResolvedModel> getModel(ObjectIdentifier
objectIdentifier) {
+ CatalogModel temporaryModel = temporaryModels.get(objectIdentifier);
+ if (temporaryModel != null) {
+ final ResolvedCatalogModel resolvedModel =
resolveCatalogModel(temporaryModel);
+ return
Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel));
+ }
+ Optional<Catalog> catalogOptional =
getCatalog(objectIdentifier.getCatalogName());
+ ObjectPath objectPath = objectIdentifier.toObjectPath();
+ if (catalogOptional.isPresent()) {
+ Catalog currentCatalog = catalogOptional.get();
+ try {
+ final CatalogModel model = currentCatalog.getModel(objectPath);
+ if (model != null) {
+ final ResolvedCatalogModel resolvedModel =
resolveCatalogModel(model);
+ return Optional.of(
+ ContextResolvedModel.permanent(
+ objectIdentifier, currentCatalog,
resolvedModel));
+ }
+ } catch (ModelNotExistException e) {
+ // Ignore.
+ } catch (UnsupportedOperationException e) {
+ // Ignore for catalogs that don't support models.
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the
model is not available
+ * in any of the catalogs.
+ */
+ public ContextResolvedModel getModelOrError(ObjectIdentifier
objectIdentifier) {
+ return getModel(objectIdentifier)
+ .orElseThrow(
+ () ->
+ new TableException(
+ String.format(
+ "Cannot find model '%s' in any
of the catalogs %s.",
+ objectIdentifier,
listCatalogs())));
+ }
+
+ /**
+ * Return whether the model with a fully qualified table path is temporary
or not.
+ *
+ * @param objectIdentifier full path of the table
+ * @return the model is temporary or not.
+ */
+ public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) {
+ return temporaryModels.containsKey(objectIdentifier);
+ }
+
+ /**
+ * Returns an array of names of all models registered in the namespace of
the current catalog
+ * and database.
+ *
+ * @return names of all registered models
+ */
+ public Set<String> listModels() {
+ return listModels(getCurrentCatalog(), getCurrentDatabase());
+ }
+
+ /**
+ * Returns an array of names of all models registered in the namespace of
the given catalog and
+ * database.
+ *
+ * @return names of all registered models
+ */
+ public Set<String> listModels(String catalogName, String databaseName) {
+ Catalog catalog = getCatalogOrThrowException(catalogName);
+ if (catalog == null) {
+ throw new ValidationException(String.format("Catalog %s does not
exist", catalogName));
+ }
+ try {
+ return new HashSet<>(catalog.listModels(databaseName));
+ } catch (DatabaseNotExistException e) {
+ throw new ValidationException(
+ String.format("Database %s does not exist", databaseName),
e);
+ }
+ }
+
+ /**
+ * Creates a model in a given fully qualified path.
+ *
+ * @param model The resolved model to put in the given path.
+ * @param objectIdentifier The fully qualified path where to put the model.
+ * @param ignoreIfExists If false exception will be thrown if a model
exists in the given path.
+ */
+ public void createModel(
+ CatalogModel model, ObjectIdentifier objectIdentifier, boolean
ignoreIfExists) {
+ execute(
+ (catalog, path) -> {
+ final ResolvedCatalogModel resolvedModel =
resolveCatalogModel(model);
+ catalog.createModel(path, resolvedModel, ignoreIfExists);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ CreateModelEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedModel,
+ ignoreIfExists,
+ false)));
+ },
+ objectIdentifier,
+ false,
+ "CreateModel");
+ }
+
+ /**
+ * Creates a temporary model in a given fully qualified path.
+ *
+ * @param model The resolved model to put in the given path.
+ * @param objectIdentifier The fully qualified path where to put the model.
+ * @param ignoreIfExists if false exception will be thrown if a model
exists in the given path.
+ */
+ public void createTemporaryModel(
+ CatalogModel model, ObjectIdentifier objectIdentifier, boolean
ignoreIfExists) {
+ Optional<TemporaryOperationListener> listener =
+ getTemporaryOperationListener(objectIdentifier);
+ temporaryModels.compute(
+ objectIdentifier,
+ (k, v) -> {
+ if (v != null) {
+ if (!ignoreIfExists) {
+ throw new ValidationException(
+ String.format(
+ "Temporary model '%s' already
exists",
+ objectIdentifier));
+ }
+ return v;
+ } else {
+ ResolvedCatalogModel resolvedModel =
resolveCatalogModel(model);
+ Catalog catalog =
+
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+ if (listener.isPresent()) {
+ return listener.get()
+ .onCreateTemporaryModel(
+ objectIdentifier.toObjectPath(),
resolvedModel);
+ }
+ catalogModificationListeners.forEach(
+ l ->
+ l.onEvent(
+ CreateModelEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedModel,
+ ignoreIfExists,
+ true)));
+ return resolvedModel;
+ }
+ });
+ }
+
+ /**
+ * Alters a model in a given fully qualified path.
+ *
+ * @param modelChange The model containing only changes
+ * @param objectIdentifier The fully qualified path where to alter the
model.
+ * @param ignoreIfNotExists If false exception will be thrown if the model
to be altered does
+ * not exist.
+ */
+ public void alterModel(
Review Comment:
We should simply copy what tables do.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+
+/** A catalog model implementation. */
+@Internal
+public class DefaultCatalogModel implements CatalogModel {
+ private final Schema inputSchema;
+ private final Schema outputSchema;
+ private final Map<String, String> modelOptions;
+ private final @Nullable String comment;
+
+ protected DefaultCatalogModel(
+ Schema inputSchema,
+ Schema outputSchema,
+ Map<String, String> modelOptions,
+ @Nullable String comment) {
+ this.inputSchema = inputSchema;
Review Comment:
add null checks for the args to avoid nulls travelling through the stack
##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java:
##########
@@ -256,6 +262,114 @@ public void testDropCurrentDatabase() throws Exception {
.hasMessage("Cannot drop a database which is currently in
use.");
}
+ @Test
+ public void testModelModificationListener() throws Exception {
+ CompletableFuture<CreateModelEvent> createFuture = new
CompletableFuture<>();
+ CompletableFuture<CreateModelEvent> createTemporaryFuture = new
CompletableFuture<>();
+ CompletableFuture<AlterModelEvent> alterFuture = new
CompletableFuture<>();
+ CompletableFuture<DropModelEvent> dropFuture = new
CompletableFuture<>();
+ CompletableFuture<DropModelEvent> dropTemporaryFuture = new
CompletableFuture<>();
+ CatalogManager catalogManager =
+ CatalogManager.newBuilder()
Review Comment:
use `org.apache.flink.table.utils.CatalogManagerMocks#preparedCatalogManager`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogModel.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** Interface for a model in a catalog. */
+@PublicEvolving
+public interface CatalogModel {
+ /** Returns a map of string-based model options. */
+ Map<String, String> getOptions();
+
+ /**
+ * Get the unresolved input schema of the model.
+ *
+ * @return unresolved input schema of the model.
+ */
+ Schema getInputSchema();
Review Comment:
So can this be null or not?
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1352,293 @@ private void dropTableInternal(
}
}
+ /**
+ * Retrieves a fully qualified model. If the path is not yet fully
qualified use {@link
+ * #qualifyIdentifier(UnresolvedIdentifier)} first.
+ *
+ * @param objectIdentifier full path of the model to retrieve
+ * @return model that the path points to.
+ */
+ public Optional<ContextResolvedModel> getModel(ObjectIdentifier
objectIdentifier) {
+ CatalogModel temporaryModel = temporaryModels.get(objectIdentifier);
+ if (temporaryModel != null) {
+ final ResolvedCatalogModel resolvedModel =
resolveCatalogModel(temporaryModel);
+ return
Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel));
+ }
+ Optional<Catalog> catalogOptional =
getCatalog(objectIdentifier.getCatalogName());
+ ObjectPath objectPath = objectIdentifier.toObjectPath();
+ if (catalogOptional.isPresent()) {
+ Catalog currentCatalog = catalogOptional.get();
+ try {
+ final CatalogModel model = currentCatalog.getModel(objectPath);
+ if (model != null) {
+ final ResolvedCatalogModel resolvedModel =
resolveCatalogModel(model);
+ return Optional.of(
+ ContextResolvedModel.permanent(
+ objectIdentifier, currentCatalog,
resolvedModel));
+ }
+ } catch (ModelNotExistException e) {
+ // Ignore.
+ } catch (UnsupportedOperationException e) {
+ // Ignore for catalogs that don't support models.
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the
model is not available
+ * in any of the catalogs.
+ */
+ public ContextResolvedModel getModelOrError(ObjectIdentifier
objectIdentifier) {
+ return getModel(objectIdentifier)
+ .orElseThrow(
+ () ->
+ new TableException(
+ String.format(
+ "Cannot find model '%s' in any
of the catalogs %s.",
+ objectIdentifier,
listCatalogs())));
+ }
+
+ /**
+ * Return whether the model with a fully qualified table path is temporary
or not.
+ *
+ * @param objectIdentifier full path of the table
+ * @return the model is temporary or not.
+ */
+ public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) {
+ return temporaryModels.containsKey(objectIdentifier);
+ }
+
+ /**
+ * Returns an array of names of all models registered in the namespace of
the current catalog
+ * and database.
+ *
+ * @return names of all registered models
+ */
+ public Set<String> listModels() {
+ return listModels(getCurrentCatalog(), getCurrentDatabase());
+ }
+
+ /**
+ * Returns an array of names of all models registered in the namespace of
the given catalog and
+ * database.
+ *
+ * @return names of all registered models
+ */
+ public Set<String> listModels(String catalogName, String databaseName) {
+ Catalog catalog = getCatalogOrThrowException(catalogName);
+ if (catalog == null) {
+ throw new ValidationException(String.format("Catalog %s does not
exist", catalogName));
+ }
+ try {
+ return new HashSet<>(catalog.listModels(databaseName));
+ } catch (DatabaseNotExistException e) {
+ throw new ValidationException(
+ String.format("Database %s does not exist", databaseName),
e);
+ }
+ }
+
+ /**
+ * Creates a model in a given fully qualified path.
+ *
+ * @param model The resolved model to put in the given path.
+ * @param objectIdentifier The fully qualified path where to put the model.
+ * @param ignoreIfExists If false exception will be thrown if a model
exists in the given path.
+ */
+ public void createModel(
+ CatalogModel model, ObjectIdentifier objectIdentifier, boolean
ignoreIfExists) {
+ execute(
+ (catalog, path) -> {
+ final ResolvedCatalogModel resolvedModel =
resolveCatalogModel(model);
+ catalog.createModel(path, resolvedModel, ignoreIfExists);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ CreateModelEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedModel,
+ ignoreIfExists,
+ false)));
+ },
+ objectIdentifier,
+ false,
+ "CreateModel");
+ }
+
+ /**
+ * Creates a temporary model in a given fully qualified path.
+ *
+ * @param model The resolved model to put in the given path.
+ * @param objectIdentifier The fully qualified path where to put the model.
+ * @param ignoreIfExists if false exception will be thrown if a model
exists in the given path.
+ */
+ public void createTemporaryModel(
+ CatalogModel model, ObjectIdentifier objectIdentifier, boolean
ignoreIfExists) {
+ Optional<TemporaryOperationListener> listener =
+ getTemporaryOperationListener(objectIdentifier);
+ temporaryModels.compute(
+ objectIdentifier,
+ (k, v) -> {
+ if (v != null) {
+ if (!ignoreIfExists) {
+ throw new ValidationException(
+ String.format(
+ "Temporary model '%s' already
exists",
+ objectIdentifier));
+ }
+ return v;
+ } else {
+ ResolvedCatalogModel resolvedModel =
resolveCatalogModel(model);
+ Catalog catalog =
+
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+ if (listener.isPresent()) {
+ return listener.get()
+ .onCreateTemporaryModel(
+ objectIdentifier.toObjectPath(),
resolvedModel);
+ }
+ catalogModificationListeners.forEach(
+ l ->
+ l.onEvent(
+ CreateModelEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ resolvedModel,
+ ignoreIfExists,
+ true)));
+ return resolvedModel;
+ }
+ });
+ }
+
+ /**
+ * Alters a model in a given fully qualified path.
+ *
+ * @param modelChange The model containing only changes
+ * @param objectIdentifier The fully qualified path where to alter the
model.
+ * @param ignoreIfNotExists If false exception will be thrown if the model
to be altered does
+ * not exist.
+ */
+ public void alterModel(
Review Comment:
And as far as I can see altering temporary objects is not supported.
##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java:
##########
@@ -23,16 +23,21 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
+import org.apache.flink.table.catalog.listener.AlterModelEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
+import org.apache.flink.table.catalog.listener.CreateModelEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
+import org.apache.flink.table.catalog.listener.DropModelEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.table.utils.ExpressionResolverMocks;
+import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap;
Review Comment:
the use of Guava should be avoided, use a regular HashMap
##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java:
##########
@@ -460,10 +617,16 @@ void testCatalogStore() throws Exception {
Collections.emptyMap()),
ObjectIdentifier.of("exist_cat", "cat_db", "test_table"),
false);
+ catalogManager.createModel(
+ CatalogModel.of(Schema.derived(), Schema.derived(),
Collections.emptyMap(), null),
Review Comment:
I'm fine using Schema.derived here but then you should not perform null
checks in CatalogManager. Because Schema.derived is not null.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]