jnh5y commented on code in PR #25211:
URL: https://github.com/apache/flink/pull/25211#discussion_r1761332241


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1352,293 @@ private void dropTableInternal(
         }
     }
 
+    /**
+     * Retrieves a fully qualified model. If the path is not yet fully 
qualified use {@link
+     * #qualifyIdentifier(UnresolvedIdentifier)} first.
+     *
+     * @param objectIdentifier full path of the model to retrieve
+     * @return model that the path points to.
+     */
+    public Optional<ContextResolvedModel> getModel(ObjectIdentifier 
objectIdentifier) {
+        CatalogModel temporaryModel = temporaryModels.get(objectIdentifier);
+        if (temporaryModel != null) {
+            final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(temporaryModel);
+            return 
Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel));
+        }
+        Optional<Catalog> catalogOptional = 
getCatalog(objectIdentifier.getCatalogName());
+        ObjectPath objectPath = objectIdentifier.toObjectPath();
+        if (catalogOptional.isPresent()) {
+            Catalog currentCatalog = catalogOptional.get();
+            try {
+                final CatalogModel model = currentCatalog.getModel(objectPath);
+                if (model != null) {
+                    final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                    return Optional.of(
+                            ContextResolvedModel.permanent(
+                                    objectIdentifier, currentCatalog, 
resolvedModel));
+                }
+            } catch (ModelNotExistException e) {
+                // Ignore.
+            } catch (UnsupportedOperationException e) {
+                // Ignore for catalogs that don't support models.
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the 
model is not available
+     * in any of the catalogs.
+     */
+    public ContextResolvedModel getModelOrError(ObjectIdentifier 
objectIdentifier) {
+        return getModel(objectIdentifier)
+                .orElseThrow(
+                        () ->
+                                new TableException(
+                                        String.format(
+                                                "Cannot find model '%s' in any 
of the catalogs %s.",
+                                                objectIdentifier, 
listCatalogs())));
+    }
+
+    /**
+     * Return whether the model with a fully qualified table path is temporary 
or not.
+     *
+     * @param objectIdentifier full path of the table
+     * @return the model is temporary or not.
+     */
+    public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) {
+        return temporaryModels.containsKey(objectIdentifier);
+    }
+
+    /**
+     * Returns an array of names of all models registered in the namespace of 
the current catalog
+     * and database.
+     *
+     * @return names of all registered models
+     */
+    public Set<String> listModels() {
+        return listModels(getCurrentCatalog(), getCurrentDatabase());
+    }
+
+    /**
+     * Returns an array of names of all models registered in the namespace of 
the given catalog and
+     * database.
+     *
+     * @return names of all registered models
+     */
+    public Set<String> listModels(String catalogName, String databaseName) {
+        Catalog catalog = getCatalogOrThrowException(catalogName);
+        if (catalog == null) {
+            throw new ValidationException(String.format("Catalog %s does not 
exist", catalogName));
+        }
+        try {
+            return new HashSet<>(catalog.listModels(databaseName));
+        } catch (DatabaseNotExistException e) {
+            throw new ValidationException(
+                    String.format("Database %s does not exist", databaseName), 
e);
+        }
+    }
+
+    /**
+     * Creates a model in a given fully qualified path.
+     *
+     * @param model The resolved model to put in the given path.
+     * @param objectIdentifier The fully qualified path where to put the model.
+     * @param ignoreIfExists If false exception will be thrown if a model 
exists in the given path.
+     */
+    public void createModel(
+            CatalogModel model, ObjectIdentifier objectIdentifier, boolean 
ignoreIfExists) {
+        execute(
+                (catalog, path) -> {
+                    final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                    catalog.createModel(path, resolvedModel, ignoreIfExists);
+                    catalogModificationListeners.forEach(
+                            listener ->
+                                    listener.onEvent(
+                                            CreateModelEvent.createEvent(
+                                                    
CatalogContext.createContext(
+                                                            
objectIdentifier.getCatalogName(),
+                                                            catalog),
+                                                    objectIdentifier,
+                                                    resolvedModel,
+                                                    ignoreIfExists,
+                                                    false)));
+                },
+                objectIdentifier,
+                false,
+                "CreateModel");
+    }
+
+    /**
+     * Creates a temporary model in a given fully qualified path.
+     *
+     * @param model The resolved model to put in the given path.
+     * @param objectIdentifier The fully qualified path where to put the model.
+     * @param ignoreIfExists if false exception will be thrown if a model 
exists in the given path.
+     */
+    public void createTemporaryModel(
+            CatalogModel model, ObjectIdentifier objectIdentifier, boolean 
ignoreIfExists) {
+        Optional<TemporaryOperationListener> listener =
+                getTemporaryOperationListener(objectIdentifier);
+        temporaryModels.compute(
+                objectIdentifier,
+                (k, v) -> {
+                    if (v != null) {
+                        if (!ignoreIfExists) {
+                            throw new ValidationException(
+                                    String.format(
+                                            "Temporary model '%s' already 
exists",
+                                            objectIdentifier));
+                        }
+                        return v;

Review Comment:
   Does there need to be any check that the existing model and requested model 
share configuration/details other than just the name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to