twalthr commented on code in PR #25036:
URL: https://github.com/apache/flink/pull/25036#discussion_r1690005374
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
+
+ // ------ models ------
+
+ /**
+ * Get names of all tables models under this database. An empty list is
returned if none exists.
+ *
+ * @return a list of the names of all models in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default List<String> listModels(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("listModel(String) is not implemented for %s.",
this.getClass()));
Review Comment:
Let's return an empty list instead. If possible we should avoid errors to
make the system more robust. At least on the read path, not necessarily on the
write path.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
+
+ // ------ models ------
+
+ /**
+ * Get names of all tables models under this database. An empty list is
returned if none exists.
Review Comment:
```suggestion
* Get names of all models under this database. An empty list is
returned if none exists.
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogModel.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Interface for a model in a catalog. */
+@PublicEvolving
+public interface CatalogModel {
+ /** Returns a map of string-based model options. */
+ Map<String, String> getOptions();
+
+ /** Returns a list of model changes. */
+ List<ModelChange> getModelChanges();
+
+ /**
+ * Get the unresolved input schema of the model.
+ *
+ * @return unresolved input schema of the model.
+ */
+ Schema getInputSchema();
+
+ /**
+ * Get the unresolved output schema of the model.
+ *
+ * @return unresolved output schema of the model.
+ */
+ Schema getOutputSchema();
+
+ /**
+ * Get comment of the model.
+ *
+ * @return comment of the model.
+ */
+ String getComment();
+
+ /**
+ * Get a deep copy of the CatalogModel instance.
+ *
+ * @return a copy of the CatalogModel instance
+ */
+ CatalogModel copy();
+
+ /**
+ * Copy the input model options into the CatalogModel instance.
+ *
+ * @return a copy of the CatalogModel instance with new model options.
+ */
+ CatalogModel copy(Map<String, String> options);
+
+ /**
+ * Creates a basic implementation of this interface.
+ *
+ * @param inputSchema unresolved input schema
+ * @param outputSchema unresolved output schema
+ * @param modelOptions model options
+ * @param comment optional comment
+ */
+ static CatalogModel of(
+ Schema inputSchema,
+ Schema outputSchema,
+ Map<String, String> modelOptions,
+ @Nullable String comment) {
+ return new DefaultCatalogModel(
+ inputSchema, outputSchema, modelOptions, new ArrayList<>(),
comment);
+ }
+
+ /**
+ * Creates a basic implementation of this interface.
+ *
+ * @param inputSchema unresolved input schema
+ * @param outputSchema unresolved output schema
+ * @param modelChanges model changes
+ * @param comment optional comment
+ */
+ static CatalogModel of(
Review Comment:
not mentioned in the FLIP, remove for now
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
+
+ // ------ models ------
+
+ /**
+ * Get names of all tables models under this database. An empty list is
returned if none exists.
+ *
+ * @return a list of the names of all models in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default List<String> listModels(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("listModel(String) is not implemented for %s.",
this.getClass()));
+ }
+
+ /**
+ * Returns a {@link CatalogModel} identified by the given {@link
ObjectPath}.
+ *
+ * @param modelPath Path of the model
+ * @return The requested model
+ * @throws ModelNotExistException if the target does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default CatalogModel getModel(ObjectPath modelPath)
+ throws ModelNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("getModel(ObjectPath) is not implemented for
%s.", this.getClass()));
+ }
+
+ /**
+ * Check if a model exists in this catalog.
+ *
+ * @param modelPath Path of the model
+ * @return true if the given model exists in the catalog false otherwise
+ * @throws CatalogException in case of any runtime exception
+ */
+ default boolean modelExists(ObjectPath modelPath) throws CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "modelExists(ObjectPath) is not implemented for %s.",
this.getClass()));
+ }
+
+ /**
+ * Drop a model.
+ *
+ * @param modelPath Path of the model to be dropped
+ * @param ignoreIfNotExists Flag to specify behavior when the model does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws ModelNotExistException if the model does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void dropModel(ObjectPath modelPath, boolean ignoreIfNotExists)
+ throws ModelNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "dropModel(ObjectPath, boolean) is not implemented for
%s.",
+ this.getClass()));
+ }
+
+ /**
+ * Rename an existing model.
+ *
+ * @param modelPath Path of the model to be renamed
+ * @param newModelName the new name of the model
+ * @param ignoreIfNotExists Flag to specify behavior when the model does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws ModelNotExistException if the model does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void renameModel(ObjectPath modelPath, String newModelName,
boolean ignoreIfNotExists)
+ throws ModelNotExistException, ModelAlreadyExistException,
CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "renameModel(ObjectPath, String, boolean) is not
implemented for %s.",
+ this.getClass()));
+ }
+
+ /**
+ * Creates a new model.
+ *
+ * @param modelPath path of the model to be created
+ * @param model the CatalogModel definition
+ * @param ignoreIfExists flag to specify behavior when a model already
exists at the given path:
+ * if set to false, it throws a ModelAlreadyExistException, if set to
true, do nothing.
+ * @throws ModelAlreadyExistException if model already exists and
ignoreIfExists is false
+ * @throws DatabaseNotExistException if the database in tablePath doesn't
exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void createModel(ObjectPath modelPath, CatalogModel model, boolean
ignoreIfExists)
+ throws ModelAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "createModel(ObjectPath, CatalogModel, boolean) is not
implemented for %s.",
+ this.getClass()));
+ }
+
+ /**
+ * Modifies an existing model. Note that the new and old {@link
CatalogModel} must be of the
+ * same kind. For example, this doesn't allow altering a remote model to
import model or native
Review Comment:
```suggestion
* same kind. For example, this doesn't allow altering a remote model to
import model or native
```
I think we should remove this example. The concepts like remote, import or
native model are a different layer and catalog specific? And in the later
sentence you say "It's up to catalog implementation to apply the changes"
anyway.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogModel.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/** A common parent that describes the <i>resolved</i> metadata of a model in
a catalog. */
+@PublicEvolving
+public interface ResolvedCatalogModel extends CatalogModel {
+ /**
+ * Returns the original, unresolved metadata object from the {@link
Catalog}.
+ *
+ * <p>This method might be useful if catalog-specific object instances
should be directly
+ * forwarded from the catalog to a factory.
+ */
+ CatalogModel getOrigin();
+
+ /** Returns a fully resolved and validated {@link ResolvedSchema}
inputSchema. */
+ ResolvedSchema getResolvedInputSchema();
+
+ /** Returns a fully resolved and validated {@link ResolvedSchema}
outputSchema. */
+ ResolvedSchema getResolvedOutputSchema();
+
+ /**
+ * Serializes this instance into a map of string-based properties.
+ *
+ * <p>Compared to the pure table options in {@link #getOptions()}, the map
includes input
+ * schema, output schema, kind, task, comment and options.
+ */
+ Map<String, String> toProperties();
+
+ /**
+ * Creates an instance of {@link CatalogModel} from a map of string
properties that were
+ * previously created with {@link ResolvedCatalogModel#toProperties()}.
+ *
+ * @param properties serialized version of a {@link ResolvedCatalogModel}
that includes input
+ * schema, output schema, kind, task, comment and options.
Review Comment:
```suggestion
* schema, output schema, comment and options.
```
##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for CatalogPropertiesUtil. */
+public class CatalogPropertiesUtilTest {
+ @Test
+ public void testCatalogModelSerde() {
+ final Map<String, String> options = new HashMap<>();
+ options.put("endpoint", "someendpoint");
+ options.put("api_key", "fake_key");
+
+ final CatalogModel catalogModel =
+ CatalogModel.of(
+ Schema.newBuilder()
+ .column(
+ "f1",
+
DataTypes.INT().getLogicalType().asSerializableString())
+ .column(
+ "f2",
+
DataTypes.STRING().getLogicalType().asSerializableString())
+ .build(),
+ Schema.newBuilder()
+ .column(
+ "label",
+
DataTypes.STRING().getLogicalType().asSerializableString())
+ .build(),
+ options,
+ "some comment");
+
+ final Column f1 = Column.physical("f1", DataTypes.INT());
+ final Column f2 = Column.physical("f2", DataTypes.STRING());
+ final Column label = Column.physical("label", DataTypes.STRING());
+ final ResolvedSchema inputSchema = ResolvedSchema.of(f1, f2);
+ final ResolvedSchema outputSchema = ResolvedSchema.of(label);
+
+ final ResolvedCatalogModel testModel =
+ ResolvedCatalogModel.of(catalogModel, inputSchema,
outputSchema);
+
+ final Map<String, String> serializedMap =
+ CatalogPropertiesUtil.serializeResolvedCatalogModel(testModel);
+ final CatalogModel deserializedModel =
+ CatalogPropertiesUtil.deserializeCatalogModel(serializedMap);
+
+ // We need to compare the fields one by one since equalsTo in
UnresolvedDataType in
+ // UnresolvedColumn doesn't work
+ BiConsumer<Schema, Schema> schemaComparision =
+ (actual, expected) -> {
+
assertThat(actual.getColumns().size()).isEqualTo(expected.getColumns().size());
+ for (int i = 0; i < actual.getColumns().size(); i++) {
+ final UnresolvedPhysicalColumn actualColumn =
+ (UnresolvedPhysicalColumn)
actual.getColumns().get(i);
+ final UnresolvedPhysicalColumn expectedColumn =
+ (UnresolvedPhysicalColumn)
expected.getColumns().get(i);
+
assertThat(actualColumn.getName()).isEqualTo(expectedColumn.getName());
+ assertThat(actualColumn.getDataType().toString())
+
.isEqualTo(expectedColumn.getDataType().toString());
+ }
+ };
+ schemaComparision.accept(deserializedModel.getInputSchema(),
catalogModel.getInputSchema());
+ schemaComparision.accept(
+ deserializedModel.getOutputSchema(),
catalogModel.getOutputSchema());
+
+
assertThat(deserializedModel.getOptions()).isEqualTo(catalogModel.getOptions());
+
assertThat(deserializedModel.getComment()).isEqualTo(catalogModel.getComment());
+ }
+
+ @Test
+ public void testCatalogTableSerde() {
+ final Map<String, String> options = new HashMap<>();
+
+ final CatalogTable catalogTable =
+ CatalogTable.newBuilder()
+ .schema(
+ Schema.newBuilder()
+ .column(
+ "f1",
+ DataTypes.INT()
+ .getLogicalType()
+
.asSerializableString())
+ .column(
+ "f2",
+ DataTypes.STRING()
+ .getLogicalType()
+
.asSerializableString())
+ .primaryKey("f1")
+ .build())
+ .comment("some comment")
+ .options(options)
+ .build();
+
+ final Column f1 = Column.physical("f1", DataTypes.INT());
+ final Column f2 = Column.physical("f2", DataTypes.STRING());
+ List<Column> columns = Arrays.asList(f1, f2);
+ final UniqueConstraint primaryKey =
+ UniqueConstraint.primaryKey("PK_f1",
Collections.singletonList("f1"));
+ final ResolvedSchema schema =
+ new ResolvedSchema(columns, Collections.emptyList(),
primaryKey);
+
+ final ResolvedCatalogTable testTable = new
ResolvedCatalogTable(catalogTable, schema);
+
+ final Map<String, String> serializedMap =
+ CatalogPropertiesUtil.serializeCatalogTable(testTable);
+ final CatalogTable deserializedTable =
+ CatalogPropertiesUtil.deserializeCatalogTable(serializedMap);
+
+ // We need to compare the fields one by one since equalsTo in
UnresolvedDataType in
Review Comment:
nit: you can also simplify the code by comparing `schema.toString`. The
toString is stable and used at a couple of places already. This should make
this method very short.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
+
+ // ------ models ------
+
+ /**
+ * Get names of all tables models under this database. An empty list is
returned if none exists.
+ *
+ * @return a list of the names of all models in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default List<String> listModels(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("listModel(String) is not implemented for %s.",
this.getClass()));
+ }
+
+ /**
+ * Returns a {@link CatalogModel} identified by the given {@link
ObjectPath}.
+ *
+ * @param modelPath Path of the model
+ * @return The requested model
+ * @throws ModelNotExistException if the target does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default CatalogModel getModel(ObjectPath modelPath)
+ throws ModelNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("getModel(ObjectPath) is not implemented for
%s.", this.getClass()));
+ }
+
+ /**
+ * Check if a model exists in this catalog.
+ *
+ * @param modelPath Path of the model
+ * @return true if the given model exists in the catalog false otherwise
+ * @throws CatalogException in case of any runtime exception
+ */
+ default boolean modelExists(ObjectPath modelPath) throws CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "modelExists(ObjectPath) is not implemented for %s.",
this.getClass()));
+ }
+
+ /**
+ * Drop a model.
+ *
+ * @param modelPath Path of the model to be dropped
+ * @param ignoreIfNotExists Flag to specify behavior when the model does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws ModelNotExistException if the model does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void dropModel(ObjectPath modelPath, boolean ignoreIfNotExists)
+ throws ModelNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "dropModel(ObjectPath, boolean) is not implemented for
%s.",
+ this.getClass()));
+ }
+
+ /**
+ * Rename an existing model.
+ *
+ * @param modelPath Path of the model to be renamed
+ * @param newModelName the new name of the model
+ * @param ignoreIfNotExists Flag to specify behavior when the model does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws ModelNotExistException if the model does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void renameModel(ObjectPath modelPath, String newModelName,
boolean ignoreIfNotExists)
+ throws ModelNotExistException, ModelAlreadyExistException,
CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "renameModel(ObjectPath, String, boolean) is not
implemented for %s.",
+ this.getClass()));
+ }
+
+ /**
+ * Creates a new model.
+ *
+ * @param modelPath path of the model to be created
+ * @param model the CatalogModel definition
+ * @param ignoreIfExists flag to specify behavior when a model already
exists at the given path:
+ * if set to false, it throws a ModelAlreadyExistException, if set to
true, do nothing.
+ * @throws ModelAlreadyExistException if model already exists and
ignoreIfExists is false
+ * @throws DatabaseNotExistException if the database in tablePath doesn't
exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void createModel(ObjectPath modelPath, CatalogModel model, boolean
ignoreIfExists)
+ throws ModelAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "createModel(ObjectPath, CatalogModel, boolean) is not
implemented for %s.",
+ this.getClass()));
+ }
+
+ /**
+ * Modifies an existing model. Note that the new and old {@link
CatalogModel} must be of the
+ * same kind. For example, this doesn't allow altering a remote model to
import model or native
+ * model, and vice versa. Note that the newModel contains only changes.
Current alter model
+ * syntax supports alter properties and rename. This call is for altering
model properties.
+ * Therefore, the newModel's properties are only changed properties. It's
up to catalog
+ * implementation to apply the changes. The reason for this behavior is
that this doesn't
+ * dictate how catalog implementation should handle model alteration. For
example, it can do
+ * Read-modify-write or merge in place etc.
+ *
+ * @param modelPath path of the model to be modified
+ * @param modelChange the CatalogModel containing only changes
Review Comment:
why only changes? usually, it should contain the full model and called
`newModel`. similar to `alterTable`. this is also written in the FLIP.
##########
flink-python/pyflink/table/catalog.py:
##########
@@ -449,6 +449,101 @@ def drop_function(self, function_path: 'ObjectPath',
ignore_if_not_exists: bool)
"""
self._j_catalog.dropFunction(function_path._j_object_path,
ignore_if_not_exists)
+ def list_models(self, database_name: str) -> List[str]:
+ """
+ List the names of all models in the given database. An empty list is
returned if none is
+ registered.
+
+ :param database_name: Name of the database.
+ :return: A list of the names of the models in this database.
+ :raise: CatalogException in case of any runtime exception.
+ DatabaseNotExistException if the database does not exist.
+ """
+ return list(self._j_catalog.listModels(database_name))
+
+ def get_model(self, model_path: 'ObjectPath') -> 'CatalogModel':
+ """
+ Get the model.
+
+ :param model_path: Path :class:`ObjectPath` of the model.
+ :return: The requested function :class:`CatalogModel`.
+ :raise: CatalogException in case of any runtime exception.
+ ModelNotExistException if the model does not exist in the
catalog.
+ """
+ return
CatalogModel._get(self._j_catalog.getModel(model_path._j_object_path))
+
+ def model_exists(self, model_path: 'ObjectPath') -> bool:
+ """
+ Check whether a model exists or not.
+
+ :param model_path: Path :class:`ObjectPath` of the model.
+ :return: true if the model exists in the catalog false otherwise.
+ :raise: CatalogException in case of any runtime exception.
+ """
+ return self._j_catalog.modelExists(model_path._j_object_path)
+
+ def drop_model(self, model_path: 'ObjectPath', ignore_if_not_exists: bool):
+ """
+ Drop a model.
+
+ :param model_path: Path :class:`ObjectPath` of the function to be
dropped.
+ :param ignore_if_not_exists: Flag to specify behavior if the model
does not exist:
+ if set to false, throw an exception
+ if set to true, nothing happens.
+ :raise: CatalogException in case of any runtime exception.
+ ModelNotExistException if the model does not exist.
+ """
+ self._j_catalog.dropModel(model_path._j_object_path,
ignore_if_not_exists)
+
+ def rename_model(self, model_path: 'ObjectPath', new_model_name: str,
+ ignore_if_not_exists: bool):
+ """
+ Rename an existing model
Review Comment:
```suggestion
Rename an existing model.
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogModel.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Interface for a model in a catalog. */
+@PublicEvolving
+public interface CatalogModel {
+ /** Returns a map of string-based model options. */
+ Map<String, String> getOptions();
+
+ /** Returns a list of model changes. */
+ List<ModelChange> getModelChanges();
Review Comment:
not mentioned in the FLIP, remove for now
##########
flink-python/pyflink/table/catalog.py:
##########
@@ -449,6 +449,101 @@ def drop_function(self, function_path: 'ObjectPath',
ignore_if_not_exists: bool)
"""
self._j_catalog.dropFunction(function_path._j_object_path,
ignore_if_not_exists)
+ def list_models(self, database_name: str) -> List[str]:
+ """
+ List the names of all models in the given database. An empty list is
returned if none is
+ registered.
+
+ :param database_name: Name of the database.
+ :return: A list of the names of the models in this database.
+ :raise: CatalogException in case of any runtime exception.
+ DatabaseNotExistException if the database does not exist.
+ """
+ return list(self._j_catalog.listModels(database_name))
+
+ def get_model(self, model_path: 'ObjectPath') -> 'CatalogModel':
+ """
+ Get the model.
+
+ :param model_path: Path :class:`ObjectPath` of the model.
+ :return: The requested function :class:`CatalogModel`.
+ :raise: CatalogException in case of any runtime exception.
+ ModelNotExistException if the model does not exist in the
catalog.
+ """
+ return
CatalogModel._get(self._j_catalog.getModel(model_path._j_object_path))
+
+ def model_exists(self, model_path: 'ObjectPath') -> bool:
+ """
+ Check whether a model exists or not.
+
+ :param model_path: Path :class:`ObjectPath` of the model.
+ :return: true if the model exists in the catalog false otherwise.
+ :raise: CatalogException in case of any runtime exception.
+ """
+ return self._j_catalog.modelExists(model_path._j_object_path)
+
+ def drop_model(self, model_path: 'ObjectPath', ignore_if_not_exists: bool):
+ """
+ Drop a model.
+
+ :param model_path: Path :class:`ObjectPath` of the function to be
dropped.
+ :param ignore_if_not_exists: Flag to specify behavior if the model
does not exist:
+ if set to false, throw an exception
+ if set to true, nothing happens.
+ :raise: CatalogException in case of any runtime exception.
+ ModelNotExistException if the model does not exist.
+ """
+ self._j_catalog.dropModel(model_path._j_object_path,
ignore_if_not_exists)
+
+ def rename_model(self, model_path: 'ObjectPath', new_model_name: str,
+ ignore_if_not_exists: bool):
+ """
+ Rename an existing model
+
+ :param model_path: Path :class:`ObjectPath` of the model to be renamed.
+ :param new_model_name: The new name of the model.
+ :param ignore_if_not_exists: Flag to specify behavior when the model
does not exist:
+ if set to false, throw an exception,
+ if set to true, do nothing.
+ :raise: CatalogException in case of any runtime exception.
+ ModelNotExistException if the model does not exist.
+ """
+ self._j_catalog.renameModel(model_path._j_object_path, new_model_name,
ignore_if_not_exists)
+
+ def create_model(self, model_path: 'ObjectPath', model: 'CatalogModel',
+ ignore_if_exists: bool):
+ """
+ Create a new model.
+
+ :param model_path: Path :class:`ObjectPath` of the model to be created.
+ :param model: The table definition :class:`CatalogModel`.
Review Comment:
```suggestion
:param model: The model definition :class:`CatalogModel`.
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
+
+ // ------ models ------
+
+ /**
+ * Get names of all tables models under this database. An empty list is
returned if none exists.
+ *
+ * @return a list of the names of all models in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default List<String> listModels(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("listModel(String) is not implemented for %s.",
this.getClass()));
+ }
+
+ /**
+ * Returns a {@link CatalogModel} identified by the given {@link
ObjectPath}.
+ *
+ * @param modelPath Path of the model
+ * @return The requested model
+ * @throws ModelNotExistException if the target does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default CatalogModel getModel(ObjectPath modelPath)
+ throws ModelNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("getModel(ObjectPath) is not implemented for
%s.", this.getClass()));
+ }
+
+ /**
+ * Check if a model exists in this catalog.
+ *
+ * @param modelPath Path of the model
+ * @return true if the given model exists in the catalog false otherwise
+ * @throws CatalogException in case of any runtime exception
+ */
+ default boolean modelExists(ObjectPath modelPath) throws CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "modelExists(ObjectPath) is not implemented for %s.",
this.getClass()));
+ }
+
+ /**
+ * Drop a model.
+ *
+ * @param modelPath Path of the model to be dropped
+ * @param ignoreIfNotExists Flag to specify behavior when the model does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws ModelNotExistException if the model does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void dropModel(ObjectPath modelPath, boolean ignoreIfNotExists)
+ throws ModelNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "dropModel(ObjectPath, boolean) is not implemented for
%s.",
+ this.getClass()));
+ }
+
+ /**
+ * Rename an existing model.
+ *
+ * @param modelPath Path of the model to be renamed
+ * @param newModelName the new name of the model
+ * @param ignoreIfNotExists Flag to specify behavior when the model does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws ModelNotExistException if the model does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void renameModel(ObjectPath modelPath, String newModelName,
boolean ignoreIfNotExists)
+ throws ModelNotExistException, ModelAlreadyExistException,
CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "renameModel(ObjectPath, String, boolean) is not
implemented for %s.",
+ this.getClass()));
+ }
+
+ /**
+ * Creates a new model.
Review Comment:
Similar to `createTable` we should add this information:
```
* <p>The framework will make sure to call this method with fully
validated {@link
* ResolvedCatalogModel}. Those instances are easy to serialize
* for a durable catalog implementation.
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
+
+ // ------ models ------
+
+ /**
+ * Get names of all tables models under this database. An empty list is
returned if none exists.
+ *
+ * @return a list of the names of all models in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default List<String> listModels(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("listModel(String) is not implemented for %s.",
this.getClass()));
+ }
+
+ /**
+ * Returns a {@link CatalogModel} identified by the given {@link
ObjectPath}.
+ *
+ * @param modelPath Path of the model
+ * @return The requested model
+ * @throws ModelNotExistException if the target does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default CatalogModel getModel(ObjectPath modelPath)
+ throws ModelNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("getModel(ObjectPath) is not implemented for
%s.", this.getClass()));
+ }
+
+ /**
+ * Check if a model exists in this catalog.
+ *
+ * @param modelPath Path of the model
+ * @return true if the given model exists in the catalog false otherwise
+ * @throws CatalogException in case of any runtime exception
+ */
+ default boolean modelExists(ObjectPath modelPath) throws CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "modelExists(ObjectPath) is not implemented for %s.",
this.getClass()));
+ }
+
+ /**
+ * Drop a model.
+ *
+ * @param modelPath Path of the model to be dropped
+ * @param ignoreIfNotExists Flag to specify behavior when the model does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws ModelNotExistException if the model does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void dropModel(ObjectPath modelPath, boolean ignoreIfNotExists)
+ throws ModelNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "dropModel(ObjectPath, boolean) is not implemented for
%s.",
+ this.getClass()));
+ }
+
+ /**
+ * Rename an existing model.
+ *
+ * @param modelPath Path of the model to be renamed
+ * @param newModelName the new name of the model
+ * @param ignoreIfNotExists Flag to specify behavior when the model does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws ModelNotExistException if the model does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void renameModel(ObjectPath modelPath, String newModelName,
boolean ignoreIfNotExists)
+ throws ModelNotExistException, ModelAlreadyExistException,
CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "renameModel(ObjectPath, String, boolean) is not
implemented for %s.",
+ this.getClass()));
+ }
+
+ /**
+ * Creates a new model.
+ *
+ * @param modelPath path of the model to be created
+ * @param model the CatalogModel definition
+ * @param ignoreIfExists flag to specify behavior when a model already
exists at the given path:
+ * if set to false, it throws a ModelAlreadyExistException, if set to
true, do nothing.
+ * @throws ModelAlreadyExistException if model already exists and
ignoreIfExists is false
+ * @throws DatabaseNotExistException if the database in tablePath doesn't
exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void createModel(ObjectPath modelPath, CatalogModel model, boolean
ignoreIfExists)
+ throws ModelAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "createModel(ObjectPath, CatalogModel, boolean) is not
implemented for %s.",
+ this.getClass()));
+ }
+
+ /**
+ * Modifies an existing model. Note that the new and old {@link
CatalogModel} must be of the
+ * same kind. For example, this doesn't allow altering a remote model to
import model or native
+ * model, and vice versa. Note that the newModel contains only changes.
Current alter model
+ * syntax supports alter properties and rename. This call is for altering
model properties.
+ * Therefore, the newModel's properties are only changed properties. It's
up to catalog
+ * implementation to apply the changes. The reason for this behavior is
that this doesn't
+ * dictate how catalog implementation should handle model alteration. For
example, it can do
+ * Read-modify-write or merge in place etc.
Review Comment:
add a section similar to `alterTable`:
```
* <p>The framework will make sure to call this method with fully
validated {@link
* ResolvedCatalogModel}. Those instances are easy to serialize
* for a durable catalog implementation.
```
##########
flink-python/pyflink/table/catalog.py:
##########
@@ -449,6 +449,101 @@ def drop_function(self, function_path: 'ObjectPath',
ignore_if_not_exists: bool)
"""
self._j_catalog.dropFunction(function_path._j_object_path,
ignore_if_not_exists)
+ def list_models(self, database_name: str) -> List[str]:
+ """
+ List the names of all models in the given database. An empty list is
returned if none is
+ registered.
+
+ :param database_name: Name of the database.
+ :return: A list of the names of the models in this database.
+ :raise: CatalogException in case of any runtime exception.
+ DatabaseNotExistException if the database does not exist.
+ """
+ return list(self._j_catalog.listModels(database_name))
+
+ def get_model(self, model_path: 'ObjectPath') -> 'CatalogModel':
+ """
+ Get the model.
+
+ :param model_path: Path :class:`ObjectPath` of the model.
+ :return: The requested function :class:`CatalogModel`.
Review Comment:
```suggestion
:return: The requested :class:`CatalogModel`.
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
+
+ // ------ models ------
+
+ /**
+ * Get names of all tables models under this database. An empty list is
returned if none exists.
+ *
+ * @return a list of the names of all models in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default List<String> listModels(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("listModel(String) is not implemented for %s.",
this.getClass()));
+ }
+
+ /**
+ * Returns a {@link CatalogModel} identified by the given {@link
ObjectPath}.
+ *
+ * @param modelPath Path of the model
+ * @return The requested model
+ * @throws ModelNotExistException if the target does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default CatalogModel getModel(ObjectPath modelPath)
+ throws ModelNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
Review Comment:
Throw `ModelNotExistException`.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** A catalog model implementation. */
+@Internal
+public class DefaultCatalogModel implements CatalogModel {
+ private final @Nullable Schema inputSchema;
+ private final @Nullable Schema outputSchema;
+ private final Map<String, String> modelOptions;
+ private final List<ModelChange> modelChanges;
+ private final @Nullable String comment;
+
+ public DefaultCatalogModel(
Review Comment:
```suggestion
protected DefaultCatalogModel(
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** A catalog model implementation. */
+@Internal
+public class DefaultCatalogModel implements CatalogModel {
+ private final @Nullable Schema inputSchema;
Review Comment:
Schemas should not be nullable.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java:
##########
@@ -339,13 +391,25 @@ public static CatalogMaterializedTable
deserializeCatalogMaterializedTable(
private static final String REFRESH_HANDLER_BYTES =
"refresh-handler-bytes";
+ private static final String MODEL_INPUT_SCHEMA = "model_input_schema";
Review Comment:
options in Flink use `-` for splitting words and `.` for concept hierarchy.
Since the porperties are per-model anyway, we can drop the prefix: so
`input-schema`
##########
flink-python/pyflink/table/catalog.py:
##########
@@ -449,6 +449,101 @@ def drop_function(self, function_path: 'ObjectPath',
ignore_if_not_exists: bool)
"""
self._j_catalog.dropFunction(function_path._j_object_path,
ignore_if_not_exists)
+ def list_models(self, database_name: str) -> List[str]:
+ """
+ List the names of all models in the given database. An empty list is
returned if none is
+ registered.
+
+ :param database_name: Name of the database.
+ :return: A list of the names of the models in this database.
+ :raise: CatalogException in case of any runtime exception.
+ DatabaseNotExistException if the database does not exist.
+ """
+ return list(self._j_catalog.listModels(database_name))
+
+ def get_model(self, model_path: 'ObjectPath') -> 'CatalogModel':
+ """
+ Get the model.
+
+ :param model_path: Path :class:`ObjectPath` of the model.
+ :return: The requested function :class:`CatalogModel`.
+ :raise: CatalogException in case of any runtime exception.
+ ModelNotExistException if the model does not exist in the
catalog.
+ """
+ return
CatalogModel._get(self._j_catalog.getModel(model_path._j_object_path))
+
+ def model_exists(self, model_path: 'ObjectPath') -> bool:
+ """
+ Check whether a model exists or not.
+
+ :param model_path: Path :class:`ObjectPath` of the model.
+ :return: true if the model exists in the catalog false otherwise.
+ :raise: CatalogException in case of any runtime exception.
+ """
+ return self._j_catalog.modelExists(model_path._j_object_path)
+
+ def drop_model(self, model_path: 'ObjectPath', ignore_if_not_exists: bool):
+ """
+ Drop a model.
+
+ :param model_path: Path :class:`ObjectPath` of the function to be
dropped.
+ :param ignore_if_not_exists: Flag to specify behavior if the model
does not exist:
+ if set to false, throw an exception
+ if set to true, nothing happens.
+ :raise: CatalogException in case of any runtime exception.
+ ModelNotExistException if the model does not exist.
+ """
+ self._j_catalog.dropModel(model_path._j_object_path,
ignore_if_not_exists)
+
+ def rename_model(self, model_path: 'ObjectPath', new_model_name: str,
+ ignore_if_not_exists: bool):
+ """
+ Rename an existing model
+
+ :param model_path: Path :class:`ObjectPath` of the model to be renamed.
+ :param new_model_name: The new name of the model.
+ :param ignore_if_not_exists: Flag to specify behavior when the model
does not exist:
+ if set to false, throw an exception,
+ if set to true, do nothing.
+ :raise: CatalogException in case of any runtime exception.
+ ModelNotExistException if the model does not exist.
+ """
+ self._j_catalog.renameModel(model_path._j_object_path, new_model_name,
ignore_if_not_exists)
+
+ def create_model(self, model_path: 'ObjectPath', model: 'CatalogModel',
+ ignore_if_exists: bool):
+ """
+ Create a new model.
+
+ :param model_path: Path :class:`ObjectPath` of the model to be created.
+ :param model: The table definition :class:`CatalogModel`.
+ :param ignore_if_exists: Flag to specify behavior when a model already
exists at
+ the given path:
+ if set to false, it throws a
ModelAlreadyExistException,
+ if set to true, do nothing.
+ :raise: CatalogException in case of any runtime exception.
+ DatabaseNotExistException if the database in tablePath doesn't
exist.
+ ModelAlreadyExistException if model already exists and
ignoreIfExists is false.
+ """
+ self._j_catalog.createModel(model_path._j_object_path,
model._j_catalog_model,
+ ignore_if_exists)
+
+ def alter_model(self, model_path: 'ObjectPath', new_model: 'CatalogModel',
+ ignore_if_not_exists):
+ """
+ Modify an existing model.
+
+ :param model_path: Path :class:`ObjectPath` of the model to be
modified.
+ :param new_model: The new table definition :class:`CatalogModel`.
Review Comment:
```suggestion
:param new_model: The new model definition :class:`CatalogModel`.
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java:
##########
@@ -339,13 +391,25 @@ public static CatalogMaterializedTable
deserializeCatalogMaterializedTable(
private static final String REFRESH_HANDLER_BYTES =
"refresh-handler-bytes";
+ private static final String MODEL_INPUT_SCHEMA = "model_input_schema";
+
+ private static final String MODEL_OUTPUT_SCHEMA = "model_output_schema";
Review Comment:
`output-schema`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException;
+
+ // ------ models ------
+
+ /**
+ * Get names of all tables models under this database. An empty list is
returned if none exists.
+ *
+ * @return a list of the names of all models in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default List<String> listModels(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("listModel(String) is not implemented for %s.",
this.getClass()));
+ }
+
+ /**
+ * Returns a {@link CatalogModel} identified by the given {@link
ObjectPath}.
+ *
+ * @param modelPath Path of the model
+ * @return The requested model
+ * @throws ModelNotExistException if the target does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default CatalogModel getModel(ObjectPath modelPath)
+ throws ModelNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format("getModel(ObjectPath) is not implemented for
%s.", this.getClass()));
+ }
+
+ /**
+ * Check if a model exists in this catalog.
+ *
+ * @param modelPath Path of the model
+ * @return true if the given model exists in the catalog false otherwise
+ * @throws CatalogException in case of any runtime exception
+ */
+ default boolean modelExists(ObjectPath modelPath) throws CatalogException {
+ throw new UnsupportedOperationException(
Review Comment:
Return false by default.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogModel.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/** A common parent that describes the <i>resolved</i> metadata of a model in
a catalog. */
+@PublicEvolving
+public interface ResolvedCatalogModel extends CatalogModel {
+ /**
+ * Returns the original, unresolved metadata object from the {@link
Catalog}.
+ *
+ * <p>This method might be useful if catalog-specific object instances
should be directly
+ * forwarded from the catalog to a factory.
+ */
+ CatalogModel getOrigin();
+
+ /** Returns a fully resolved and validated {@link ResolvedSchema}
inputSchema. */
+ ResolvedSchema getResolvedInputSchema();
+
+ /** Returns a fully resolved and validated {@link ResolvedSchema}
outputSchema. */
+ ResolvedSchema getResolvedOutputSchema();
+
+ /**
+ * Serializes this instance into a map of string-based properties.
+ *
+ * <p>Compared to the pure table options in {@link #getOptions()}, the map
includes input
+ * schema, output schema, kind, task, comment and options.
Review Comment:
```suggestion
* schema, output schema, comment and options.
```
kind and task are not important for the catalog layer
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java:
##########
@@ -276,6 +300,34 @@ public static CatalogMaterializedTable
deserializeCatalogMaterializedTable(
}
}
+ /*
+ * Deserializes the given map of string properties into an unresolved
{@link CatalogModel}.
+ *
+ * @param properties The properties to deserialize from
+ * @return {@link CatalogModel}
+ */
+ public static CatalogModel deserializeCatalogModel(Map<String, String>
properties) {
+ try {
+ final Builder inputSchemaBuilder = Schema.newBuilder();
+ deserializeColumns(properties, MODEL_INPUT_SCHEMA,
inputSchemaBuilder);
+ final @Nullable Schema inputSchema = inputSchemaBuilder.build();
Review Comment:
`@Nullable` makes no sense here. The Builder will never return null.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1347,193 @@ private void dropTableInternal(
}
}
+ /**
+ * Retrieves a fully qualified model. If the path is not yet fully
qualified use {@link
+ * #qualifyIdentifier(UnresolvedIdentifier)} first.
+ *
+ * @param objectIdentifier full path of the model to retrieve
+ * @return model that the path points to.
+ */
+ public Optional<ResolvedCatalogModel> getModel(ObjectIdentifier
objectIdentifier) {
Review Comment:
All catalog objects should also exist in a temporary version.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogModel.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/** A common parent that describes the <i>resolved</i> metadata of a model in
a catalog. */
Review Comment:
```suggestion
/** * A validated {@link CatalogModel} that is backed by the original
metadata coming from the {@link
* Catalog} but resolved by the framework. */
```
The "common parent" sounds like copy paste from ResolvedBaseTable ;-)
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ModelChange.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Objects;
+
+/** {@link ModelChange} represents the modification of the model. */
+@PublicEvolving
+public interface ModelChange {
Review Comment:
as mentioned above, let's skip the ModelChange in this PR. It was not
mentioned in the FLIP.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/ModelException.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * General Exception for all errors during model handling.
+ *
+ * <p>This exception indicates that an internal error occurred or that a
feature is not supported
+ * yet. Usually, this exception does not indicate a fault of the user.
+ */
+@PublicEvolving
+public class ModelException extends RuntimeException {
Review Comment:
We also don't introduce dedicated exceptions for other catalog objects.
Until there is an urgent need. Let's not add this.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1347,193 @@ private void dropTableInternal(
}
}
+ /**
+ * Retrieves a fully qualified model. If the path is not yet fully
qualified use {@link
+ * #qualifyIdentifier(UnresolvedIdentifier)} first.
+ *
+ * @param objectIdentifier full path of the model to retrieve
+ * @return model that the path points to.
+ */
+ public Optional<ResolvedCatalogModel> getModel(ObjectIdentifier
objectIdentifier) {
+ Optional<Catalog> catalogOptional =
getCatalog(objectIdentifier.getCatalogName());
+ ObjectPath objectPath = objectIdentifier.toObjectPath();
+ if (catalogOptional.isPresent()) {
+ Catalog currentCatalog = catalogOptional.get();
+ try {
+ final CatalogModel model = currentCatalog.getModel(objectPath);
+ if (model != null) {
+ final ResolvedCatalogModel resolvedModel =
resolveCatalogModel(model);
+ return Optional.of(resolvedModel);
+ }
+ } catch (ModelNotExistException e) {
+ // Ignore.
+ } catch (UnsupportedOperationException e) {
+ // Ignore for catalogs that don't support models.
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the
model is not available
+ * in any of the catalogs.
+ */
+ public ResolvedCatalogModel getModelOrError(ObjectIdentifier
objectIdentifier) {
+ return getModel(objectIdentifier)
+ .orElseThrow(
+ () ->
+ new ModelException(
Review Comment:
Use same exception for tables: `TableException`. "Table" in this case refers
not to the catalog object but this is a general exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]