This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a22afee5 [#5794] feat(core): Add ModelOperationDispatcher logic 
(#5908)
1a22afee5 is described below

commit 1a22afee5c54e4752816d26f8b0b4786cc0a520a
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Dec 23 12:12:26 2024 +0800

    [#5794] feat(core): Add ModelOperationDispatcher logic (#5908)
    
    ### What changes were proposed in this pull request?
    
    This PR adds the ModelOperationDispatcher logic in core.
    
    ### Why are the changes needed?
    
    This is a part of work to support model management.
    
    Fix: #5794
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added UTs to test.
---
 .../apache/gravitino/catalog/CatalogManager.java   |  15 ++
 .../gravitino/catalog/EntityCombinedFileset.java   |   2 +-
 .../gravitino/catalog/EntityCombinedModel.java     |  94 ++++++++
 .../catalog/EntityCombinedModelVersion.java        | 101 ++++++++
 .../gravitino/catalog/EntityCombinedSchema.java    |   2 +-
 .../gravitino/catalog/EntityCombinedTable.java     |   2 +-
 .../gravitino/catalog/EntityCombinedTopic.java     |   2 +-
 .../catalog/FilesetOperationDispatcher.java        |   6 +-
 .../catalog/ModelOperationDispatcher.java          | 166 ++++++++++++-
 .../catalog/SchemaOperationDispatcher.java         |  18 +-
 .../catalog/TableOperationDispatcher.java          |  16 +-
 .../catalog/TopicOperationDispatcher.java          |  10 +-
 .../java/org/apache/gravitino/TestCatalog.java     |   5 +
 .../test/java/org/apache/gravitino/TestModel.java  |  44 ++++
 .../org/apache/gravitino/TestModelVersion.java     |  45 ++++
 .../catalog/TestModelOperationDispatcher.java      | 264 +++++++++++++++++++++
 .../gravitino/connector/TestCatalogOperations.java | 248 ++++++++++++++++++-
 17 files changed, 1000 insertions(+), 40 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 2e77b8e16..43bc74bb2 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -95,6 +95,7 @@ import org.apache.gravitino.messaging.TopicCatalog;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.model.ModelCatalog;
 import org.apache.gravitino.rel.SupportsPartitions;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
@@ -178,6 +179,16 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
           });
     }
 
+    public <R> R doWithModelOps(ThrowableFunction<ModelCatalog, R> fn) throws 
Exception {
+      return classLoader.withClassLoader(
+          cl -> {
+            if (asModels() == null) {
+              throw new UnsupportedOperationException("Catalog does not 
support model operations");
+            }
+            return fn.apply(asModels());
+          });
+    }
+
     public <R> R doWithCatalogOps(ThrowableFunction<CatalogOperations, R> fn) 
throws Exception {
       return classLoader.withClassLoader(cl -> fn.apply(catalog.ops()));
     }
@@ -236,6 +247,10 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
     private TopicCatalog asTopics() {
       return catalog.ops() instanceof TopicCatalog ? (TopicCatalog) 
catalog.ops() : null;
     }
+
+    private ModelCatalog asModels() {
+      return catalog.ops() instanceof ModelCatalog ? (ModelCatalog) 
catalog.ops() : null;
+    }
   }
 
   private final Config config;
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
index 2a6b55a2d..c7b847fc9 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
@@ -48,7 +48,7 @@ public final class EntityCombinedFileset implements Fileset {
     return new EntityCombinedFileset(fileset, null);
   }
 
-  public EntityCombinedFileset withHiddenPropertiesSet(Set<String> 
hiddenProperties) {
+  public EntityCombinedFileset withHiddenProperties(Set<String> 
hiddenProperties) {
     this.hiddenProperties = hiddenProperties;
     return this;
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModel.java 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModel.java
new file mode 100644
index 000000000..4aeefa0be
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModel.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.model.Model;
+
+public final class EntityCombinedModel implements Model {
+
+  private final Model model;
+
+  private final ModelEntity modelEntity;
+
+  private Set<String> hiddenProperties = Collections.emptySet();
+
+  private EntityCombinedModel(Model model, ModelEntity modelEntity) {
+    this.model = model;
+    this.modelEntity = modelEntity;
+  }
+
+  public static EntityCombinedModel of(Model model, ModelEntity modelEntity) {
+    return new EntityCombinedModel(model, modelEntity);
+  }
+
+  public static EntityCombinedModel of(Model model) {
+    return new EntityCombinedModel(model, null);
+  }
+
+  public EntityCombinedModel withHiddenProperties(Set<String> 
hiddenProperties) {
+    this.hiddenProperties = hiddenProperties;
+    return this;
+  }
+
+  @Override
+  public String name() {
+    return model.name();
+  }
+
+  @Override
+  public String comment() {
+    return model.comment();
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return model.properties() == null
+        ? null
+        : model.properties().entrySet().stream()
+            .filter(e -> !hiddenProperties.contains(e.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
+  @Override
+  public int latestVersion() {
+    return model.latestVersion();
+  }
+
+  @Override
+  public Audit auditInfo() {
+    AuditInfo mergedAudit =
+        AuditInfo.builder()
+            .withCreator(model.auditInfo().creator())
+            .withCreateTime(model.auditInfo().createTime())
+            .withLastModifier(model.auditInfo().lastModifier())
+            .withLastModifiedTime(model.auditInfo().lastModifiedTime())
+            .build();
+
+    return modelEntity == null
+        ? mergedAudit
+        : mergedAudit.merge(modelEntity.auditInfo(), true /* overwrite */);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModelVersion.java
 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModelVersion.java
new file mode 100644
index 000000000..b41e2889d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModelVersion.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.model.ModelVersion;
+
+public final class EntityCombinedModelVersion implements ModelVersion {
+
+  private final ModelVersion modelVersion;
+
+  private final ModelVersionEntity modelVersionEntity;
+
+  private Set<String> hiddenProperties = Collections.emptySet();
+
+  private EntityCombinedModelVersion(
+      ModelVersion modelVersion, ModelVersionEntity modelVersionEntity) {
+    this.modelVersion = modelVersion;
+    this.modelVersionEntity = modelVersionEntity;
+  }
+
+  public static EntityCombinedModelVersion of(
+      ModelVersion modelVersion, ModelVersionEntity modelVersionEntity) {
+    return new EntityCombinedModelVersion(modelVersion, modelVersionEntity);
+  }
+
+  public static EntityCombinedModelVersion of(ModelVersion modelVersion) {
+    return new EntityCombinedModelVersion(modelVersion, null);
+  }
+
+  public EntityCombinedModelVersion withHiddenProperties(Set<String> 
hiddenProperties) {
+    this.hiddenProperties = hiddenProperties;
+    return this;
+  }
+
+  @Override
+  public int version() {
+    return modelVersion.version();
+  }
+
+  @Override
+  public String comment() {
+    return modelVersion.comment();
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return modelVersion.properties() == null
+        ? null
+        : modelVersion.properties().entrySet().stream()
+            .filter(e -> !hiddenProperties.contains(e.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
+  @Override
+  public String uri() {
+    return modelVersion.uri();
+  }
+
+  @Override
+  public String[] aliases() {
+    return modelVersion.aliases();
+  }
+
+  @Override
+  public Audit auditInfo() {
+    AuditInfo mergedAudit =
+        AuditInfo.builder()
+            .withCreator(modelVersion.auditInfo().creator())
+            .withCreateTime(modelVersion.auditInfo().createTime())
+            .withLastModifier(modelVersion.auditInfo().lastModifier())
+            .withLastModifiedTime(modelVersion.auditInfo().lastModifiedTime())
+            .build();
+
+    return modelVersionEntity == null
+        ? mergedAudit
+        : mergedAudit.merge(modelVersionEntity.auditInfo(), true /* overwrite 
*/);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java
index 79a4b12a1..ce3d0a3be 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java
@@ -61,7 +61,7 @@ public final class EntityCombinedSchema implements Schema {
     return of(schema, null);
   }
 
-  public EntityCombinedSchema withHiddenPropertiesSet(Set<String> 
hiddenProperties) {
+  public EntityCombinedSchema withHiddenProperties(Set<String> 
hiddenProperties) {
     this.hiddenProperties = hiddenProperties;
     return this;
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
index 4b0da1568..70cbd0ace 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
@@ -67,7 +67,7 @@ public final class EntityCombinedTable implements Table {
     return new EntityCombinedTable(table, null);
   }
 
-  public EntityCombinedTable withHiddenPropertiesSet(Set<String> 
hiddenProperties) {
+  public EntityCombinedTable withHiddenProperties(Set<String> 
hiddenProperties) {
     this.hiddenProperties = hiddenProperties;
     return this;
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java
index 2360f31ae..972df622b 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java
@@ -60,7 +60,7 @@ public class EntityCombinedTopic implements Topic {
     return new EntityCombinedTopic(topic, null);
   }
 
-  public EntityCombinedTopic withHiddenPropertiesSet(Set<String> 
hiddenProperties) {
+  public EntityCombinedTopic withHiddenProperties(Set<String> 
hiddenProperties) {
     this.hiddenProperties = hiddenProperties;
     return this;
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
index 98c6311bd..828e98138 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
@@ -81,7 +81,7 @@ public class FilesetOperationDispatcher extends 
OperationDispatcher implements F
 
     // Currently we only support maintaining the Fileset in the Gravitino's 
store.
     return EntityCombinedFileset.of(fileset)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdent,
                 HasPropertyMetadata::filesetPropertiesMetadata,
@@ -137,7 +137,7 @@ public class FilesetOperationDispatcher extends 
OperationDispatcher implements F
             NoSuchSchemaException.class,
             FilesetAlreadyExistsException.class);
     return EntityCombinedFileset.of(createdFileset)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdent,
                 HasPropertyMetadata::filesetPropertiesMetadata,
@@ -172,7 +172,7 @@ public class FilesetOperationDispatcher extends 
OperationDispatcher implements F
             NoSuchFilesetException.class,
             IllegalArgumentException.class);
     return EntityCombinedFileset.of(alteredFileset)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdent,
                 HasPropertyMetadata::filesetPropertiesMetadata,
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java
index eb1f17c96..1c5291d51 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java
@@ -18,15 +18,23 @@
  */
 package org.apache.gravitino.catalog;
 
+import static 
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
+import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
+
 import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.exceptions.ModelAlreadyExistsException;
 import 
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
 import org.apache.gravitino.exceptions.NoSuchModelException;
 import org.apache.gravitino.exceptions.NoSuchModelVersionException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.model.Model;
 import org.apache.gravitino.model.ModelVersion;
 import org.apache.gravitino.storage.IdGenerator;
@@ -40,40 +48,114 @@ public class ModelOperationDispatcher extends 
OperationDispatcher implements Mod
 
   @Override
   public NameIdentifier[] listModels(Namespace namespace) throws 
NoSuchSchemaException {
-    throw new UnsupportedOperationException("Not implemented");
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(namespace.levels()),
+        LockType.READ,
+        () ->
+            doWithCatalog(
+                getCatalogIdentifier(NameIdentifier.of(namespace.levels())),
+                c -> c.doWithModelOps(m -> m.listModels(namespace)),
+                NoSuchSchemaException.class));
   }
 
   @Override
   public Model getModel(NameIdentifier ident) throws NoSuchModelException {
-    throw new UnsupportedOperationException("Not implemented");
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    Model model =
+        TreeLockUtils.doWithTreeLock(
+            ident,
+            LockType.READ,
+            () ->
+                doWithCatalog(
+                    catalogIdent,
+                    c -> c.doWithModelOps(m -> m.getModel(ident)),
+                    NoSuchModelException.class));
+
+    return EntityCombinedModel.of(model)
+        .withHiddenProperties(
+            getHiddenPropertyNames(
+                catalogIdent, HasPropertyMetadata::modelPropertiesMetadata, 
model.properties()));
   }
 
   @Override
   public Model registerModel(NameIdentifier ident, String comment, Map<String, 
String> properties)
       throws NoSuchModelException, ModelAlreadyExistsException {
-    throw new UnsupportedOperationException("Not implemented");
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    Map<String, String> updatedProperties = 
checkAndUpdateProperties(catalogIdent, properties);
+
+    Model registeredModel =
+        TreeLockUtils.doWithTreeLock(
+            NameIdentifier.of(ident.namespace().levels()),
+            LockType.WRITE,
+            () ->
+                doWithCatalog(
+                    catalogIdent,
+                    c -> c.doWithModelOps(m -> m.registerModel(ident, comment, 
updatedProperties)),
+                    NoSuchSchemaException.class,
+                    ModelAlreadyExistsException.class));
+
+    return EntityCombinedModel.of(registeredModel)
+        .withHiddenProperties(
+            getHiddenPropertyNames(
+                catalogIdent,
+                HasPropertyMetadata::modelPropertiesMetadata,
+                registeredModel.properties()));
   }
 
   @Override
   public boolean deleteModel(NameIdentifier ident) {
-    throw new UnsupportedOperationException("Not implemented");
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(ident.namespace().levels()),
+        LockType.WRITE,
+        () ->
+            doWithCatalog(
+                getCatalogIdentifier(ident),
+                c -> c.doWithModelOps(m -> m.deleteModel(ident)),
+                RuntimeException.class));
   }
 
   @Override
   public int[] listModelVersions(NameIdentifier ident) throws 
NoSuchModelException {
-    throw new UnsupportedOperationException("Not implemented");
+    return TreeLockUtils.doWithTreeLock(
+        ident,
+        LockType.READ,
+        () ->
+            doWithCatalog(
+                getCatalogIdentifier(ident),
+                c -> c.doWithModelOps(m -> m.listModelVersions(ident)),
+                NoSuchModelException.class));
   }
 
   @Override
   public ModelVersion getModelVersion(NameIdentifier ident, int version)
       throws NoSuchModelVersionException {
-    throw new UnsupportedOperationException("Not implemented");
+    return internalGetModelVersion(
+        ident,
+        () ->
+            TreeLockUtils.doWithTreeLock(
+                ident,
+                LockType.READ,
+                () ->
+                    doWithCatalog(
+                        getCatalogIdentifier(ident),
+                        c -> c.doWithModelOps(m -> m.getModelVersion(ident, 
version)),
+                        NoSuchModelVersionException.class)));
   }
 
   @Override
   public ModelVersion getModelVersion(NameIdentifier ident, String alias)
       throws NoSuchModelVersionException {
-    throw new UnsupportedOperationException("Not implemented");
+    return internalGetModelVersion(
+        ident,
+        () ->
+            TreeLockUtils.doWithTreeLock(
+                ident,
+                LockType.READ,
+                () ->
+                    doWithCatalog(
+                        getCatalogIdentifier(ident),
+                        c -> c.doWithModelOps(m -> m.getModelVersion(ident, 
alias)),
+                        NoSuchModelVersionException.class)));
   }
 
   @Override
@@ -84,16 +166,80 @@ public class ModelOperationDispatcher extends 
OperationDispatcher implements Mod
       String comment,
       Map<String, String> properties)
       throws NoSuchModelException, ModelVersionAliasesAlreadyExistException {
-    throw new UnsupportedOperationException("Not implemented");
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    Map<String, String> updatedProperties = 
checkAndUpdateProperties(catalogIdent, properties);
+
+    TreeLockUtils.doWithTreeLock(
+        ident,
+        LockType.WRITE,
+        () ->
+            doWithCatalog(
+                catalogIdent,
+                c ->
+                    c.doWithModelOps(
+                        m -> {
+                          m.linkModelVersion(ident, uri, aliases, comment, 
updatedProperties);
+                          return null;
+                        }),
+                NoSuchModelException.class,
+                ModelVersionAliasesAlreadyExistException.class));
   }
 
   @Override
   public boolean deleteModelVersion(NameIdentifier ident, int version) {
-    throw new UnsupportedOperationException("Not implemented");
+    return TreeLockUtils.doWithTreeLock(
+        ident,
+        LockType.WRITE,
+        () ->
+            doWithCatalog(
+                getCatalogIdentifier(ident),
+                c -> c.doWithModelOps(m -> m.deleteModelVersion(ident, 
version)),
+                RuntimeException.class));
   }
 
   @Override
   public boolean deleteModelVersion(NameIdentifier ident, String alias) {
-    throw new UnsupportedOperationException("Not implemented");
+    return TreeLockUtils.doWithTreeLock(
+        ident,
+        LockType.WRITE,
+        () ->
+            doWithCatalog(
+                getCatalogIdentifier(ident),
+                c -> c.doWithModelOps(m -> m.deleteModelVersion(ident, alias)),
+                RuntimeException.class));
+  }
+
+  private ModelVersion internalGetModelVersion(
+      NameIdentifier ident, Supplier<ModelVersion> supplier) {
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+
+    ModelVersion modelVersion = supplier.get();
+    return EntityCombinedModelVersion.of(modelVersion)
+        .withHiddenProperties(
+            getHiddenPropertyNames(
+                catalogIdent,
+                HasPropertyMetadata::modelPropertiesMetadata,
+                modelVersion.properties()));
+  }
+
+  private Map<String, String> checkAndUpdateProperties(
+      NameIdentifier catalogIdent, Map<String, String> properties) {
+    TreeLockUtils.doWithTreeLock(
+        catalogIdent,
+        LockType.READ,
+        () ->
+            doWithCatalog(
+                catalogIdent,
+                c ->
+                    c.doWithPropertiesMeta(
+                        p -> {
+                          
validatePropertyForCreate(p.modelPropertiesMetadata(), properties);
+                          return null;
+                        }),
+                IllegalArgumentException.class));
+
+    long uid = idGenerator.nextId();
+    StringIdentifier stringId = StringIdentifier.fromId(uid);
+    return StringIdentifier.newPropertiesWithId(stringId, properties);
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
index ce870523a..789e5e471 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
@@ -125,7 +125,7 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
     boolean isManagedSchema = isManagedEntity(catalogIdent, 
Capability.Scope.SCHEMA);
     if (isManagedSchema) {
       return EntityCombinedSchema.of(schema)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   catalogIdent,
                   HasPropertyMetadata::schemaPropertiesMetadata,
@@ -149,7 +149,7 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
     } catch (Exception e) {
       LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
       return EntityCombinedSchema.of(schema)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   catalogIdent,
                   HasPropertyMetadata::schemaPropertiesMetadata,
@@ -158,7 +158,7 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
 
     // Merge both the metadata from catalog operation and the metadata from 
entity store.
     return EntityCombinedSchema.of(schema, schemaEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdent, HasPropertyMetadata::schemaPropertiesMetadata, 
schema.properties()));
   }
@@ -214,7 +214,7 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
     boolean isManagedSchema = isManagedEntity(catalogIdent, 
Capability.Scope.SCHEMA);
     if (isManagedSchema) {
       return EntityCombinedSchema.of(alteredSchema)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   catalogIdent,
                   HasPropertyMetadata::schemaPropertiesMetadata,
@@ -225,7 +225,7 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
     // Case 1: The schema is not created by Gravitino.
     if (stringId == null) {
       return EntityCombinedSchema.of(alteredSchema)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   catalogIdent,
                   HasPropertyMetadata::schemaPropertiesMetadata,
@@ -258,7 +258,7 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
             stringId.id());
 
     return EntityCombinedSchema.of(alteredSchema, updatedSchemaEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdent,
                 HasPropertyMetadata::schemaPropertiesMetadata,
@@ -375,7 +375,7 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
     boolean isManagedSchema = isManagedEntity(catalogIdentifier, 
Capability.Scope.SCHEMA);
     if (isManagedSchema) {
       return EntityCombinedSchema.of(schema)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   catalogIdentifier,
                   HasPropertyMetadata::schemaPropertiesMetadata,
@@ -389,7 +389,7 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
     // storing string identifiers.
     if (stringId == null) {
       return EntityCombinedSchema.of(schema)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   catalogIdentifier,
                   HasPropertyMetadata::schemaPropertiesMetadata,
@@ -408,7 +408,7 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
             stringId.id());
 
     return EntityCombinedSchema.of(schema, schemaEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdentifier,
                 HasPropertyMetadata::schemaPropertiesMetadata,
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index de34e712a..7a4c5a565 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -126,7 +126,7 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
     TableEntity updatedEntity = updateColumnsIfNecessaryWhenLoad(ident, 
entityCombinedTable);
 
     return EntityCombinedTable.of(entityCombinedTable.tableFromCatalog(), 
updatedEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 getCatalogIdentifier(ident),
                 HasPropertyMetadata::tablePropertiesMetadata,
@@ -208,7 +208,7 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
     // Case 1: The table is not created by Gravitino.
     if (stringId == null) {
       return EntityCombinedTable.of(alteredTable)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   getCatalogIdentifier(ident),
                   HasPropertyMetadata::tablePropertiesMetadata,
@@ -252,7 +252,7 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             stringId.id());
 
     return EntityCombinedTable.of(alteredTable, updatedTableEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 getCatalogIdentifier(ident),
                 HasPropertyMetadata::tablePropertiesMetadata,
@@ -406,7 +406,7 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
     }
 
     return EntityCombinedTable.of(table.tableFromCatalog(), tableEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 getCatalogIdentifier(identifier),
                 HasPropertyMetadata::tablePropertiesMetadata,
@@ -426,7 +426,7 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
     // string identifier.
     if (stringId == null) {
       return EntityCombinedTable.of(table)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   catalogIdentifier,
                   HasPropertyMetadata::tablePropertiesMetadata,
@@ -445,7 +445,7 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             stringId.id());
 
     return EntityCombinedTable.of(table, tableEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdentifier,
                 HasPropertyMetadata::tablePropertiesMetadata,
@@ -524,13 +524,13 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
     } catch (Exception e) {
       LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
       return EntityCombinedTable.of(table)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
     }
 
     return EntityCombinedTable.of(table, tableEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
index d44a09b15..eb4914b08 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
@@ -195,7 +195,7 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
             getStringIdFromProperties(alteredTopic.properties()).id());
 
     return EntityCombinedTopic.of(alteredTopic, updatedTopicEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdent,
                 HasPropertyMetadata::topicPropertiesMetadata,
@@ -303,7 +303,7 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
     StringIdentifier stringId = getStringIdFromProperties(topic.properties());
     if (stringId == null) {
       return EntityCombinedTopic.of(topic)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()))
           .withImported(isEntityExist(ident, TOPIC));
@@ -317,7 +317,7 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
             getStringIdFromProperties(topic.properties()).id());
 
     return EntityCombinedTopic.of(topic, topicEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()))
         .withImported(topicEntity != null);
@@ -368,13 +368,13 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
     } catch (Exception e) {
       LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, 
"put", ident, e);
       return EntityCombinedTopic.of(topic)
-          .withHiddenPropertiesSet(
+          .withHiddenProperties(
               getHiddenPropertyNames(
                   catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()));
     }
 
     return EntityCombinedTopic.of(topic, topicEntity)
-        .withHiddenPropertiesSet(
+        .withHiddenProperties(
             getHiddenPropertyNames(
                 catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()));
   }
diff --git a/core/src/test/java/org/apache/gravitino/TestCatalog.java 
b/core/src/test/java/org/apache/gravitino/TestCatalog.java
index bdb409f20..420396559 100644
--- a/core/src/test/java/org/apache/gravitino/TestCatalog.java
+++ b/core/src/test/java/org/apache/gravitino/TestCatalog.java
@@ -134,4 +134,9 @@ public class TestCatalog extends BaseCatalog<TestCatalog> {
   public PropertiesMetadata topicPropertiesMetadata() throws 
UnsupportedOperationException {
     return BASE_PROPERTIES_METADATA;
   }
+
+  @Override
+  public PropertiesMetadata modelPropertiesMetadata() throws 
UnsupportedOperationException {
+    return BASE_PROPERTIES_METADATA;
+  }
 }
diff --git a/core/src/test/java/org/apache/gravitino/TestModel.java 
b/core/src/test/java/org/apache/gravitino/TestModel.java
new file mode 100644
index 000000000..ee632192f
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/TestModel.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino;
+
+import org.apache.gravitino.connector.BaseModel;
+
+public class TestModel extends BaseModel {
+
+  public static class Builder extends BaseModelBuilder<Builder, TestModel> {
+
+    private Builder() {}
+
+    @Override
+    protected TestModel internalBuild() {
+      TestModel model = new TestModel();
+      model.name = name;
+      model.comment = comment;
+      model.properties = properties;
+      model.latestVersion = latestVersion;
+      model.auditInfo = auditInfo;
+      return model;
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+}
diff --git a/core/src/test/java/org/apache/gravitino/TestModelVersion.java 
b/core/src/test/java/org/apache/gravitino/TestModelVersion.java
new file mode 100644
index 000000000..487496c5f
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/TestModelVersion.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino;
+
+import org.apache.gravitino.connector.BaseModelVersion;
+
+public class TestModelVersion extends BaseModelVersion {
+
+  public static class Builder extends BaseModelVersionBuilder<Builder, 
TestModelVersion> {
+
+    private Builder() {}
+
+    @Override
+    protected TestModelVersion internalBuild() {
+      TestModelVersion modelVersion = new TestModelVersion();
+      modelVersion.version = version;
+      modelVersion.comment = comment;
+      modelVersion.aliases = aliases;
+      modelVersion.uri = uri;
+      modelVersion.properties = properties;
+      modelVersion.auditInfo = auditInfo;
+      return modelVersion;
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestModelOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestModelOperationDispatcher.java
new file mode 100644
index 000000000..10bb85a1e
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestModelOperationDispatcher.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.StringIdentifier.ID_KEY;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.NoSuchModelException;
+import org.apache.gravitino.exceptions.NoSuchModelVersionException;
+import org.apache.gravitino.lock.LockManager;
+import org.apache.gravitino.model.Model;
+import org.apache.gravitino.model.ModelVersion;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestModelOperationDispatcher extends TestOperationDispatcher {
+
+  static ModelOperationDispatcher modelOperationDispatcher;
+
+  static SchemaOperationDispatcher schemaOperationDispatcher;
+
+  @BeforeAll
+  public static void initialize() throws IOException, IllegalAccessException {
+    Config config = Mockito.mock(Config.class);
+    Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+    Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+    Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
+
+    modelOperationDispatcher =
+        new ModelOperationDispatcher(catalogManager, entityStore, idGenerator);
+    schemaOperationDispatcher =
+        new SchemaOperationDispatcher(catalogManager, entityStore, 
idGenerator);
+  }
+
+  @Test
+  public void testRegisterAndGetModel() {
+    String schemaName = randomSchemaName();
+    NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, 
schemaName);
+    schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    String modelName = randomModelName();
+    NameIdentifier modelIdent =
+        NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName);
+
+    Model model = modelOperationDispatcher.registerModel(modelIdent, 
"comment", props);
+    Assertions.assertEquals(modelName, model.name());
+    Assertions.assertEquals("comment", model.comment());
+    Assertions.assertEquals(props, model.properties());
+    Assertions.assertFalse(model.properties().containsKey(ID_KEY));
+
+    Model registeredModel = modelOperationDispatcher.getModel(modelIdent);
+    Assertions.assertEquals(modelName, registeredModel.name());
+    Assertions.assertEquals("comment", registeredModel.comment());
+    Assertions.assertEquals(props, registeredModel.properties());
+    Assertions.assertFalse(registeredModel.properties().containsKey(ID_KEY));
+
+    // Test register model with illegal property
+    Map<String, String> illegalProps = ImmutableMap.of("k1", "v1", ID_KEY, 
"test");
+    testPropertyException(
+        () -> modelOperationDispatcher.registerModel(modelIdent, "comment", 
illegalProps),
+        "Properties are reserved and cannot be set",
+        ID_KEY);
+  }
+
+  @Test
+  public void testRegisterAndListModels() {
+    String schemaName = randomSchemaName();
+    NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, 
schemaName);
+    schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    String modelName1 = randomModelName();
+    NameIdentifier modelIdent1 =
+        NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName1);
+    modelOperationDispatcher.registerModel(modelIdent1, "comment", props);
+
+    String modelName2 = randomModelName();
+    NameIdentifier modelIdent2 =
+        NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName2);
+    modelOperationDispatcher.registerModel(modelIdent2, "comment", props);
+
+    NameIdentifier[] modelIdents = 
modelOperationDispatcher.listModels(modelIdent1.namespace());
+    Assertions.assertEquals(2, modelIdents.length);
+    Set<NameIdentifier> modelIdentSet = Sets.newHashSet(modelIdents);
+    Assertions.assertTrue(modelIdentSet.contains(modelIdent1));
+    Assertions.assertTrue(modelIdentSet.contains(modelIdent2));
+  }
+
+  @Test
+  public void testRegisterAndDeleteModel() {
+    String schemaName = randomSchemaName();
+    NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, 
schemaName);
+    schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    String modelName = randomModelName();
+    NameIdentifier modelIdent =
+        NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName);
+
+    modelOperationDispatcher.registerModel(modelIdent, "comment", props);
+    Assertions.assertTrue(modelOperationDispatcher.deleteModel(modelIdent));
+    Assertions.assertFalse(modelOperationDispatcher.deleteModel(modelIdent));
+    Assertions.assertThrows(
+        NoSuchModelException.class, () -> 
modelOperationDispatcher.getModel(modelIdent));
+
+    // Test delete in-existent model
+    Assertions.assertFalse(
+        modelOperationDispatcher.deleteModel(NameIdentifier.of(metalake, 
catalog, "inexistent")));
+  }
+
+  @Test
+  public void testLinkAndGetModelVersion() {
+    String schemaName = randomSchemaName();
+    NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, 
schemaName);
+    schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    String modelName = randomModelName();
+    NameIdentifier modelIdent =
+        NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName);
+
+    Model model = modelOperationDispatcher.registerModel(modelIdent, 
"comment", props);
+    Assertions.assertEquals(0, model.latestVersion());
+
+    String[] aliases = new String[] {"alias1", "alias2"};
+    modelOperationDispatcher.linkModelVersion(modelIdent, "path", aliases, 
"comment", props);
+
+    ModelVersion linkedModelVersion = 
modelOperationDispatcher.getModelVersion(modelIdent, 0);
+    Assertions.assertEquals(0, linkedModelVersion.version());
+    Assertions.assertEquals("path", linkedModelVersion.uri());
+    Assertions.assertArrayEquals(aliases, linkedModelVersion.aliases());
+    Assertions.assertEquals("comment", linkedModelVersion.comment());
+    Assertions.assertEquals(props, linkedModelVersion.properties());
+    
Assertions.assertFalse(linkedModelVersion.properties().containsKey(ID_KEY));
+
+    // Test get model version with alias
+    ModelVersion linkedModelVersionWithAlias =
+        modelOperationDispatcher.getModelVersion(modelIdent, "alias1");
+    Assertions.assertEquals(0, linkedModelVersionWithAlias.version());
+    Assertions.assertEquals("path", linkedModelVersionWithAlias.uri());
+    Assertions.assertArrayEquals(aliases, 
linkedModelVersionWithAlias.aliases());
+    
Assertions.assertFalse(linkedModelVersionWithAlias.properties().containsKey(ID_KEY));
+
+    ModelVersion linkedModelVersionWithAlias2 =
+        modelOperationDispatcher.getModelVersion(modelIdent, "alias2");
+    Assertions.assertEquals(0, linkedModelVersionWithAlias2.version());
+    Assertions.assertEquals("path", linkedModelVersionWithAlias2.uri());
+    Assertions.assertArrayEquals(aliases, 
linkedModelVersionWithAlias2.aliases());
+    
Assertions.assertFalse(linkedModelVersionWithAlias2.properties().containsKey(ID_KEY));
+
+    // Test Link model version with illegal property
+    Map<String, String> illegalProps = ImmutableMap.of("k1", "v1", ID_KEY, 
"test");
+    testPropertyException(
+        () ->
+            modelOperationDispatcher.linkModelVersion(
+                modelIdent, "path", aliases, "comment", illegalProps),
+        "Properties are reserved and cannot be set",
+        ID_KEY);
+  }
+
+  @Test
+  public void testLinkAndListModelVersion() {
+    String schemaName = randomSchemaName();
+    NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, 
schemaName);
+    schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    String modelName = randomModelName();
+    NameIdentifier modelIdent =
+        NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName);
+
+    Model model = modelOperationDispatcher.registerModel(modelIdent, 
"comment", props);
+    Assertions.assertEquals(0, model.latestVersion());
+
+    String[] aliases1 = new String[] {"alias1"};
+    String[] aliases2 = new String[] {"alias2"};
+    modelOperationDispatcher.linkModelVersion(modelIdent, "path1", aliases1, 
"comment", props);
+    modelOperationDispatcher.linkModelVersion(modelIdent, "path2", aliases2, 
"comment", props);
+
+    int[] versions = modelOperationDispatcher.listModelVersions(modelIdent);
+    Assertions.assertEquals(2, versions.length);
+    Set<Integer> versionSet = 
Arrays.stream(versions).boxed().collect(Collectors.toSet());
+    Assertions.assertTrue(versionSet.contains(0));
+    Assertions.assertTrue(versionSet.contains(1));
+  }
+
+  @Test
+  public void testLinkAndDeleteModelVersion() {
+    String schemaName = randomSchemaName();
+    NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, 
schemaName);
+    schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    String modelName = randomModelName();
+    NameIdentifier modelIdent =
+        NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName);
+
+    Model model = modelOperationDispatcher.registerModel(modelIdent, 
"comment", props);
+    Assertions.assertEquals(0, model.latestVersion());
+
+    String[] aliases = new String[] {"alias1"};
+    modelOperationDispatcher.linkModelVersion(modelIdent, "path", aliases, 
"comment", props);
+    
Assertions.assertTrue(modelOperationDispatcher.deleteModelVersion(modelIdent, 
0));
+    
Assertions.assertFalse(modelOperationDispatcher.deleteModelVersion(modelIdent, 
0));
+    Assertions.assertThrows(
+        NoSuchModelVersionException.class,
+        () -> modelOperationDispatcher.getModelVersion(modelIdent, 0));
+
+    // Test delete in-existent model version
+    
Assertions.assertFalse(modelOperationDispatcher.deleteModelVersion(modelIdent, 
1));
+
+    // Tet delete model version with alias
+    String[] aliases2 = new String[] {"alias2"};
+    modelOperationDispatcher.linkModelVersion(modelIdent, "path2", aliases2, 
"comment", props);
+    
Assertions.assertTrue(modelOperationDispatcher.deleteModelVersion(modelIdent, 
"alias2"));
+    
Assertions.assertFalse(modelOperationDispatcher.deleteModelVersion(modelIdent, 
"alias2"));
+    Assertions.assertThrows(
+        NoSuchModelVersionException.class,
+        () -> modelOperationDispatcher.getModelVersion(modelIdent, "alias2"));
+  }
+
+  private String randomSchemaName() {
+    return "schema_" + UUID.randomUUID().toString().replace("-", "");
+  }
+
+  private String randomModelName() {
+    return "model_" + UUID.randomUUID().toString().replace("-", "");
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java 
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
index 4fb98c596..f7775ef32 100644
--- 
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
+++ 
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
@@ -26,11 +26,13 @@ import java.time.Instant;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
@@ -38,6 +40,8 @@ import org.apache.gravitino.Schema;
 import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.TestColumn;
 import org.apache.gravitino.TestFileset;
+import org.apache.gravitino.TestModel;
+import org.apache.gravitino.TestModelVersion;
 import org.apache.gravitino.TestSchema;
 import org.apache.gravitino.TestTable;
 import org.apache.gravitino.TestTopic;
@@ -47,8 +51,12 @@ import org.apache.gravitino.audit.FilesetDataOperation;
 import org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.ModelAlreadyExistsException;
+import 
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchModelException;
+import org.apache.gravitino.exceptions.NoSuchModelVersionException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.NoSuchTopicException;
@@ -64,6 +72,9 @@ import org.apache.gravitino.messaging.Topic;
 import org.apache.gravitino.messaging.TopicCatalog;
 import org.apache.gravitino.messaging.TopicChange;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.model.Model;
+import org.apache.gravitino.model.ModelCatalog;
+import org.apache.gravitino.model.ModelVersion;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
@@ -76,7 +87,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TestCatalogOperations
-    implements CatalogOperations, TableCatalog, FilesetCatalog, TopicCatalog, 
SupportsSchemas {
+    implements CatalogOperations,
+        TableCatalog,
+        FilesetCatalog,
+        TopicCatalog,
+        ModelCatalog,
+        SupportsSchemas {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestCatalogOperations.class);
 
   private final Map<NameIdentifier, TestTable> tables;
@@ -87,6 +103,12 @@ public class TestCatalogOperations
 
   private final Map<NameIdentifier, TestTopic> topics;
 
+  private final Map<NameIdentifier, TestModel> models;
+
+  private final Map<Pair<NameIdentifier, Integer>, TestModelVersion> 
modelVersions;
+
+  private final Map<Pair<NameIdentifier, String>, Integer> modelAliasToVersion;
+
   public static final String FAIL_CREATE = "fail-create";
 
   public static final String FAIL_TEST = "need-fail";
@@ -98,6 +120,9 @@ public class TestCatalogOperations
     schemas = Maps.newHashMap();
     filesets = Maps.newHashMap();
     topics = Maps.newHashMap();
+    models = Maps.newHashMap();
+    modelVersions = Maps.newHashMap();
+    modelAliasToVersion = Maps.newHashMap();
   }
 
   @Override
@@ -649,6 +674,227 @@ public class TestCatalogOperations
     }
   }
 
+  @Override
+  public NameIdentifier[] listModels(Namespace namespace) throws 
NoSuchSchemaException {
+    NameIdentifier modelSchemaIdent = NameIdentifier.of(namespace.levels());
+    if (!schemas.containsKey(modelSchemaIdent)) {
+      throw new NoSuchSchemaException("Schema %s does not exist", 
modelSchemaIdent);
+    }
+
+    return models.keySet().stream()
+        .filter(ident -> ident.namespace().equals(namespace))
+        .toArray(NameIdentifier[]::new);
+  }
+
+  @Override
+  public Model getModel(NameIdentifier ident) throws NoSuchModelException {
+    if (models.containsKey(ident)) {
+      return models.get(ident);
+    } else {
+      throw new NoSuchModelException("Model %s does not exist", ident);
+    }
+  }
+
+  @Override
+  public Model registerModel(NameIdentifier ident, String comment, Map<String, 
String> properties)
+      throws NoSuchSchemaException, ModelAlreadyExistsException {
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    if (!schemas.containsKey(schemaIdent)) {
+      throw new NoSuchSchemaException("Schema %s does not exist", schemaIdent);
+    }
+
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+    TestModel model =
+        TestModel.builder()
+            .withName(ident.name())
+            .withComment(comment)
+            .withProperties(properties)
+            .withLatestVersion(0)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    if (models.containsKey(ident)) {
+      throw new ModelAlreadyExistsException("Model %s already exists", ident);
+    } else {
+      models.put(ident, model);
+    }
+
+    return model;
+  }
+
+  @Override
+  public boolean deleteModel(NameIdentifier ident) {
+    if (!models.containsKey(ident)) {
+      return false;
+    }
+
+    models.remove(ident);
+
+    List<Pair<NameIdentifier, Integer>> deletedVersions =
+        modelVersions.entrySet().stream()
+            .filter(e -> e.getKey().getLeft().equals(ident))
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+    deletedVersions.forEach(modelVersions::remove);
+
+    List<Pair<NameIdentifier, String>> deletedAliases =
+        modelAliasToVersion.entrySet().stream()
+            .filter(e -> e.getKey().getLeft().equals(ident))
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+    deletedAliases.forEach(modelAliasToVersion::remove);
+
+    return true;
+  }
+
+  @Override
+  public int[] listModelVersions(NameIdentifier ident) throws 
NoSuchModelException {
+    if (!models.containsKey(ident)) {
+      throw new NoSuchModelException("Model %s does not exist", ident);
+    }
+
+    return modelVersions.entrySet().stream()
+        .filter(e -> e.getKey().getLeft().equals(ident))
+        .mapToInt(e -> e.getValue().version())
+        .toArray();
+  }
+
+  @Override
+  public ModelVersion getModelVersion(NameIdentifier ident, int version)
+      throws NoSuchModelVersionException {
+    if (!models.containsKey(ident)) {
+      throw new NoSuchModelVersionException("Model %s does not exist", ident);
+    }
+
+    Pair<NameIdentifier, Integer> versionPair = Pair.of(ident, version);
+    if (!modelVersions.containsKey(versionPair)) {
+      throw new NoSuchModelVersionException("Model version %s does not exist", 
versionPair);
+    }
+
+    return modelVersions.get(versionPair);
+  }
+
+  @Override
+  public ModelVersion getModelVersion(NameIdentifier ident, String alias)
+      throws NoSuchModelVersionException {
+    if (!models.containsKey(ident)) {
+      throw new NoSuchModelVersionException("Model %s does not exist", ident);
+    }
+
+    Pair<NameIdentifier, String> aliasPair = Pair.of(ident, alias);
+    if (!modelAliasToVersion.containsKey(aliasPair)) {
+      throw new NoSuchModelVersionException("Model version %s does not exist", 
alias);
+    }
+
+    int version = modelAliasToVersion.get(aliasPair);
+    Pair<NameIdentifier, Integer> versionPair = Pair.of(ident, version);
+    if (!modelVersions.containsKey(versionPair)) {
+      throw new NoSuchModelVersionException("Model version %s does not exist", 
versionPair);
+    }
+
+    return modelVersions.get(versionPair);
+  }
+
+  @Override
+  public void linkModelVersion(
+      NameIdentifier ident,
+      String uri,
+      String[] aliases,
+      String comment,
+      Map<String, String> properties)
+      throws NoSuchModelException, ModelVersionAliasesAlreadyExistException {
+    if (!models.containsKey(ident)) {
+      throw new NoSuchModelException("Model %s does not exist", ident);
+    }
+
+    String[] aliasArray = aliases != null ? aliases : new String[0];
+    for (String alias : aliasArray) {
+      Pair<NameIdentifier, String> aliasPair = Pair.of(ident, alias);
+      if (modelAliasToVersion.containsKey(aliasPair)) {
+        throw new ModelVersionAliasesAlreadyExistException(
+            "Model version alias %s already exists", alias);
+      }
+    }
+
+    int version = models.get(ident).latestVersion();
+    TestModelVersion modelVersion =
+        TestModelVersion.builder()
+            .withVersion(version)
+            .withAliases(aliases)
+            .withComment(comment)
+            .withUri(uri)
+            .withProperties(properties)
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+    Pair<NameIdentifier, Integer> versionPair = Pair.of(ident, version);
+    modelVersions.put(versionPair, modelVersion);
+    for (String alias : aliasArray) {
+      Pair<NameIdentifier, String> aliasPair = Pair.of(ident, alias);
+      modelAliasToVersion.put(aliasPair, version);
+    }
+
+    TestModel model = models.get(ident);
+    TestModel updatedModel =
+        TestModel.builder()
+            .withName(model.name())
+            .withComment(model.comment())
+            .withProperties(model.properties())
+            .withLatestVersion(version + 1)
+            .withAuditInfo(model.auditInfo())
+            .build();
+    models.put(ident, updatedModel);
+  }
+
+  @Override
+  public boolean deleteModelVersion(NameIdentifier ident, int version) {
+    if (!models.containsKey(ident)) {
+      return false;
+    }
+
+    Pair<NameIdentifier, Integer> versionPair = Pair.of(ident, version);
+    if (!modelVersions.containsKey(versionPair)) {
+      return false;
+    }
+
+    TestModelVersion modelVersion = modelVersions.remove(versionPair);
+    if (modelVersion.aliases() != null) {
+      for (String alias : modelVersion.aliases()) {
+        Pair<NameIdentifier, String> aliasPair = Pair.of(ident, alias);
+        modelAliasToVersion.remove(aliasPair);
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean deleteModelVersion(NameIdentifier ident, String alias) {
+    if (!models.containsKey(ident)) {
+      return false;
+    }
+
+    Pair<NameIdentifier, String> aliasPair = Pair.of(ident, alias);
+    if (!modelAliasToVersion.containsKey(aliasPair)) {
+      return false;
+    }
+
+    int version = modelAliasToVersion.remove(aliasPair);
+    Pair<NameIdentifier, Integer> versionPair = Pair.of(ident, version);
+    if (!modelVersions.containsKey(versionPair)) {
+      return false;
+    }
+
+    TestModelVersion modelVersion = modelVersions.remove(versionPair);
+    for (String modelVersionAlias : modelVersion.aliases()) {
+      Pair<NameIdentifier, String> modelAliasPair = Pair.of(ident, 
modelVersionAlias);
+      modelAliasToVersion.remove(modelAliasPair);
+    }
+
+    return true;
+  }
+
   private boolean hasCallerContext() {
     return CallerContext.CallerContextHolder.get() != null
         && CallerContext.CallerContextHolder.get().context() != null

Reply via email to