This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 13e78f09e [#5602] feat(core): Add model storage schema layout (part-2) 
(#5728)
13e78f09e is described below

commit 13e78f09e53fba3dd6ff71b6febf478e3c48a06b
Author: Jerry Shao <[email protected]>
AuthorDate: Fri Dec 13 10:13:38 2024 +0800

    [#5602] feat(core): Add model storage schema layout (part-2) (#5728)
    
    ### What changes were proposed in this pull request?
    
    This PR adds the second part to support storage schema layout for model
    version and model version alias relationship.
    
    ### Why are the changes needed?
    
    This is a part of work to support model management.
    
    Fix: #5602
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added UTs to cover the code.
---
 .../apache/gravitino/meta/ModelVersionEntity.java  |  42 +-
 .../gravitino/storage/relational/JDBCBackend.java  |  24 +-
 .../storage/relational/mapper/ModelMetaMapper.java |   3 +
 .../mapper/ModelMetaSQLProviderFactory.java        |   4 +
 .../mapper/ModelVersionAliasRelMapper.java         |  94 ++++
 .../ModelVersionAliasSQLProviderFactory.java       | 107 +++++
 .../relational/mapper/ModelVersionMetaMapper.java  |  93 ++++
 .../mapper/ModelVersionMetaSQLProviderFactory.java | 105 +++++
 .../provider/base/ModelMetaBaseSQLProvider.java    |   7 +
 .../base/ModelVersionAliasRelBaseSQLProvider.java  | 151 +++++++
 .../base/ModelVersionMetaBaseSQLProvider.java      | 150 +++++++
 .../ModelVersionAliasRelPostgreSQLProvider.java    | 101 +++++
 .../ModelVersionMetaPostgreSQLProvider.java        |  93 ++++
 .../relational/po/ModelVersionAliasRelPO.java      |   9 +-
 .../storage/relational/po/ModelVersionPO.java      |   5 -
 .../relational/service/CatalogMetaService.java     |  10 +
 .../relational/service/MetalakeMetaService.java    |  10 +
 .../relational/service/ModelMetaService.java       |  58 ++-
 .../service/ModelVersionMetaService.java           | 250 +++++++++++
 .../relational/service/SchemaMetaService.java      |  10 +
 .../session/SqlSessionFactoryHelper.java           |   4 +
 .../storage/relational/utils/POConverters.java     |  68 +++
 .../gravitino/meta/TestModelVersionEntity.java     |  14 +-
 .../gravitino/storage/TestEntityStorage.java       | 163 ++++++-
 .../storage/relational/TestJDBCBackend.java        |  21 +
 .../service/TestModelVersionMetaService.java       | 491 +++++++++++++++++++++
 .../storage/relational/utils/TestPOConverters.java | 174 ++++++++
 27 files changed, 2218 insertions(+), 43 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/meta/ModelVersionEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/ModelVersionEntity.java
index 6b9f44a52..d6c4bfdf8 100644
--- a/core/src/main/java/org/apache/gravitino/meta/ModelVersionEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/ModelVersionEntity.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.meta;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.util.Collections;
 import java.util.List;
@@ -27,10 +28,16 @@ import lombok.ToString;
 import org.apache.gravitino.Auditable;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.Field;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
 
 @ToString
-public class ModelVersionEntity implements Entity, Auditable {
+public class ModelVersionEntity implements Entity, Auditable, HasIdentifier {
 
+  public static final Field MODEL_IDENT =
+      Field.required(
+          "model_ident", NameIdentifier.class, "The name identifier of the 
model entity.");
   public static final Field VERSION =
       Field.required("version", Integer.class, "The version of the model 
entity.");
   public static final Field COMMENT =
@@ -44,6 +51,8 @@ public class ModelVersionEntity implements Entity, Auditable {
   public static final Field AUDIT_INFO =
       Field.required("audit_info", AuditInfo.class, "The audit details of the 
model entity.");
 
+  private NameIdentifier modelIdent;
+
   private Integer version;
 
   private String comment;
@@ -61,6 +70,7 @@ public class ModelVersionEntity implements Entity, Auditable {
   @Override
   public Map<Field, Object> fields() {
     Map<Field, Object> fields = Maps.newHashMap();
+    fields.put(MODEL_IDENT, modelIdent);
     fields.put(VERSION, version);
     fields.put(COMMENT, comment);
     fields.put(ALIASES, aliases);
@@ -71,6 +81,10 @@ public class ModelVersionEntity implements Entity, Auditable 
{
     return Collections.unmodifiableMap(fields);
   }
 
+  public NameIdentifier modelIdentifier() {
+    return modelIdent;
+  }
+
   public Integer version() {
     return version;
   }
@@ -101,6 +115,23 @@ public class ModelVersionEntity implements Entity, 
Auditable {
     return EntityType.MODEL_VERSION;
   }
 
+  @Override
+  public String name() {
+    return String.valueOf(version);
+  }
+
+  @Override
+  public Namespace namespace() {
+    List<String> levels = Lists.newArrayList(modelIdent.namespace().levels());
+    levels.add(modelIdent.name());
+    return Namespace.of(levels.toArray(new String[0]));
+  }
+
+  @Override
+  public Long id() {
+    throw new UnsupportedOperationException("Model version entity does not 
have an ID.");
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -113,6 +144,7 @@ public class ModelVersionEntity implements Entity, 
Auditable {
 
     ModelVersionEntity that = (ModelVersionEntity) o;
     return Objects.equals(version, that.version)
+        && Objects.equals(modelIdent, that.modelIdent)
         && Objects.equals(comment, that.comment)
         && Objects.equals(aliases, that.aliases)
         && Objects.equals(uri, that.uri)
@@ -122,7 +154,7 @@ public class ModelVersionEntity implements Entity, 
Auditable {
 
   @Override
   public int hashCode() {
-    return Objects.hash(version, comment, aliases, uri, properties, auditInfo);
+    return Objects.hash(modelIdent, version, comment, aliases, uri, 
properties, auditInfo);
   }
 
   public static Builder builder() {
@@ -136,6 +168,11 @@ public class ModelVersionEntity implements Entity, 
Auditable {
       model = new ModelVersionEntity();
     }
 
+    public Builder withModelIdentifier(NameIdentifier modelIdent) {
+      model.modelIdent = modelIdent;
+      return this;
+    }
+
     public Builder withVersion(int version) {
       model.version = version;
       return this;
@@ -168,6 +205,7 @@ public class ModelVersionEntity implements Entity, 
Auditable {
 
     public ModelVersionEntity build() {
       model.validate();
+      model.aliases = model.aliases == null ? Collections.emptyList() : 
model.aliases;
       return model;
     }
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 961257808..8e3cf5c87 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -43,6 +43,7 @@ import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.TableEntity;
@@ -56,6 +57,7 @@ import 
org.apache.gravitino.storage.relational.service.FilesetMetaService;
 import org.apache.gravitino.storage.relational.service.GroupMetaService;
 import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
 import org.apache.gravitino.storage.relational.service.ModelMetaService;
+import org.apache.gravitino.storage.relational.service.ModelVersionMetaService;
 import org.apache.gravitino.storage.relational.service.OwnerMetaService;
 import org.apache.gravitino.storage.relational.service.RoleMetaService;
 import org.apache.gravitino.storage.relational.service.SchemaMetaService;
@@ -65,6 +67,8 @@ import 
org.apache.gravitino.storage.relational.service.TagMetaService;
 import org.apache.gravitino.storage.relational.service.TopicMetaService;
 import org.apache.gravitino.storage.relational.service.UserMetaService;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@link JDBCBackend} is a jdbc implementation of {@link RelationalBackend} 
interface. You can use
@@ -74,6 +78,8 @@ import 
org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
  */
 public class JDBCBackend implements RelationalBackend {
 
+  private static final Logger LOG = LoggerFactory.getLogger(JDBCBackend.class);
+
   private static final Map<JDBCBackendType, String> EMBEDDED_JDBC_DATABASE_MAP 
=
       ImmutableMap.of(JDBCBackendType.H2, H2Database.class.getCanonicalName());
 
@@ -115,6 +121,9 @@ public class JDBCBackend implements RelationalBackend {
         return (List<E>) 
GroupMetaService.getInstance().listGroupsByNamespace(namespace, allFields);
       case MODEL:
         return (List<E>) 
ModelMetaService.getInstance().listModelsByNamespace(namespace);
+      case MODEL_VERSION:
+        return (List<E>)
+            
ModelVersionMetaService.getInstance().listModelVersionsByNamespace(namespace);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for list operation", entityType);
@@ -156,6 +165,13 @@ public class JDBCBackend implements RelationalBackend {
       TagMetaService.getInstance().insertTag((TagEntity) e, overwritten);
     } else if (e instanceof ModelEntity) {
       ModelMetaService.getInstance().insertModel((ModelEntity) e, overwritten);
+    } else if (e instanceof ModelVersionEntity) {
+      if (overwritten) {
+        LOG.warn(
+            "'overwritten' is not supported for model version meta, ignoring 
this flag and "
+                + "inserting the new model version.");
+      }
+      
ModelVersionMetaService.getInstance().insertModelVersion((ModelVersionEntity) 
e);
     } else {
       throw new UnsupportedEntityTypeException(
           "Unsupported entity type: %s for insert operation", e.getClass());
@@ -220,6 +236,8 @@ public class JDBCBackend implements RelationalBackend {
         return (E) TagMetaService.getInstance().getTagByIdentifier(ident);
       case MODEL:
         return (E) ModelMetaService.getInstance().getModelByIdentifier(ident);
+      case MODEL_VERSION:
+        return (E) 
ModelVersionMetaService.getInstance().getModelVersionByIdentifier(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for get operation", entityType);
@@ -252,6 +270,8 @@ public class JDBCBackend implements RelationalBackend {
         return TagMetaService.getInstance().deleteTag(ident);
       case MODEL:
         return ModelMetaService.getInstance().deleteModel(ident);
+      case MODEL_VERSION:
+        return ModelVersionMetaService.getInstance().deleteModelVersion(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for delete operation", entityType);
@@ -310,7 +330,9 @@ public class JDBCBackend implements RelationalBackend {
             .deleteModelMetasByLegacyTimeline(
                 legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case MODEL_VERSION:
-        // TODO (jerryshao): Implement hard delete logic for these entity 
types.
+        return ModelVersionMetaService.getInstance()
+            .deleteModelVersionMetasByLegacyTimeline(
+                legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case AUDIT:
         return 0;
         // TODO: Implement hard delete logic for these entity types.
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
index 5b3c4a93f..53aba8353 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
@@ -81,4 +81,7 @@ public interface ModelMetaMapper {
       method = "deleteModelMetasByLegacyTimeline")
   Integer deleteModelMetasByLegacyTimeline(
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+  @UpdateProvider(type = ModelMetaSQLProviderFactory.class, method = 
"updateModelLatestVersion")
+  Integer updateModelLatestVersion(@Param("modelId") Long modelId);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
index 74334ec6e..71c205083 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
@@ -97,4 +97,8 @@ public class ModelMetaSQLProviderFactory {
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
     return getProvider().deleteModelMetasByLegacyTimeline(legacyTimeline, 
limit);
   }
+
+  public static String updateModelLatestVersion(@Param("modelId") Long 
modelId) {
+    return getProvider().updateModelLatestVersion(modelId);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionAliasRelMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionAliasRelMapper.java
new file mode 100644
index 000000000..696064975
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionAliasRelMapper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.ModelVersionAliasRelPO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface ModelVersionAliasRelMapper {
+
+  String TABLE_NAME = "model_version_alias_rel";
+
+  @InsertProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "insertModelVersionAliasRels")
+  void insertModelVersionAliasRels(
+      @Param("modelVersionAliasRel") List<ModelVersionAliasRelPO> 
modelVersionAliasRelPOs);
+
+  @SelectProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "selectModelVersionAliasRelsByModelId")
+  List<ModelVersionAliasRelPO> 
selectModelVersionAliasRelsByModelId(@Param("modelId") Long modelId);
+
+  @SelectProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "selectModelVersionAliasRelsByModelIdAndVersion")
+  List<ModelVersionAliasRelPO> selectModelVersionAliasRelsByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion);
+
+  @SelectProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "selectModelVersionAliasRelsByModelIdAndAlias")
+  List<ModelVersionAliasRelPO> selectModelVersionAliasRelsByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias);
+
+  @UpdateProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "softDeleteModelVersionAliasRelsBySchemaIdAndModelName")
+  Integer softDeleteModelVersionAliasRelsBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName);
+
+  @UpdateProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "softDeleteModelVersionAliasRelsByModelIdAndVersion")
+  Integer softDeleteModelVersionAliasRelsByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion);
+
+  @UpdateProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "softDeleteModelVersionAliasRelsByModelIdAndAlias")
+  Integer softDeleteModelVersionAliasRelsByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias);
+
+  @UpdateProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "softDeleteModelVersionAliasRelsBySchemaId")
+  Integer softDeleteModelVersionAliasRelsBySchemaId(@Param("schemaId") Long 
schemaId);
+
+  @UpdateProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "softDeleteModelVersionAliasRelsByCatalogId")
+  Integer softDeleteModelVersionAliasRelsByCatalogId(@Param("catalogId") Long 
catalogId);
+
+  @UpdateProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "softDeleteModelVersionAliasRelsByMetalakeId")
+  Integer softDeleteModelVersionAliasRelsByMetalakeId(@Param("metalakeId") 
Long metalakeId);
+
+  @DeleteProvider(
+      type = ModelVersionAliasSQLProviderFactory.class,
+      method = "deleteModelVersionAliasRelsByLegacyTimeline")
+  Integer deleteModelVersionAliasRelsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionAliasSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionAliasSQLProviderFactory.java
new file mode 100644
index 000000000..726e3d0e2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionAliasSQLProviderFactory.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.ModelVersionAliasRelBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.ModelVersionAliasRelPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.ModelVersionAliasRelPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class ModelVersionAliasSQLProviderFactory {
+
+  static class ModelVersionAliasRelMySQLProvider extends 
ModelVersionAliasRelBaseSQLProvider {}
+
+  static class ModelVersionAliasRelH2Provider extends 
ModelVersionAliasRelBaseSQLProvider {}
+
+  private static final Map<JDBCBackendType, 
ModelVersionAliasRelBaseSQLProvider>
+      MODEL_VERSION_META_SQL_PROVIDER_MAP =
+          ImmutableMap.of(
+              JDBCBackendType.MYSQL, new ModelVersionAliasRelMySQLProvider(),
+              JDBCBackendType.H2, new ModelVersionAliasRelH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
ModelVersionAliasRelPostgreSQLProvider());
+
+  public static ModelVersionAliasRelBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+    return MODEL_VERSION_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  public static String insertModelVersionAliasRels(
+      @Param("modelVersionAliasRel") List<ModelVersionAliasRelPO> 
modelVersionAliasRelPOs) {
+    return getProvider().insertModelVersionAliasRels(modelVersionAliasRelPOs);
+  }
+
+  public static String selectModelVersionAliasRelsByModelId(@Param("modelId") 
Long modelId) {
+    return getProvider().selectModelVersionAliasRelsByModelId(modelId);
+  }
+
+  public static String selectModelVersionAliasRelsByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion) {
+    return 
getProvider().selectModelVersionAliasRelsByModelIdAndVersion(modelId, 
modelVersion);
+  }
+
+  public static String selectModelVersionAliasRelsByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias) {
+    return getProvider().selectModelVersionAliasRelsByModelIdAndAlias(modelId, 
alias);
+  }
+
+  public static String softDeleteModelVersionAliasRelsBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return 
getProvider().softDeleteModelVersionAliasRelsBySchemaIdAndModelName(schemaId, 
modelName);
+  }
+
+  public static String softDeleteModelVersionAliasRelsByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion) {
+    return 
getProvider().softDeleteModelVersionAliasRelsByModelIdAndVersion(modelId, 
modelVersion);
+  }
+
+  public static String softDeleteModelVersionAliasRelsByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias) {
+    return 
getProvider().softDeleteModelVersionAliasRelsByModelIdAndAlias(modelId, alias);
+  }
+
+  public static String 
softDeleteModelVersionAliasRelsBySchemaId(@Param("schemaId") Long schemaId) {
+    return getProvider().softDeleteModelVersionAliasRelsBySchemaId(schemaId);
+  }
+
+  public static String softDeleteModelVersionAliasRelsByCatalogId(
+      @Param("catalogId") Long catalogId) {
+    return getProvider().softDeleteModelVersionAliasRelsByCatalogId(catalogId);
+  }
+
+  public static String softDeleteModelVersionAliasRelsByMetalakeId(
+      @Param("metalakeId") Long metalakeId) {
+    return 
getProvider().softDeleteModelVersionAliasRelsByMetalakeId(metalakeId);
+  }
+
+  public static String deleteModelVersionAliasRelsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return 
getProvider().deleteModelVersionAliasRelsByLegacyTimeline(legacyTimeline, 
limit);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionMetaMapper.java
new file mode 100644
index 000000000..6bd6fa5de
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionMetaMapper.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.ModelVersionPO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface ModelVersionMetaMapper {
+
+  String TABLE_NAME = "model_version_info";
+
+  @InsertProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "insertModelVersionMeta")
+  void insertModelVersionMeta(@Param("modelVersionMeta") ModelVersionPO 
modelVersionPO);
+
+  @SelectProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "listModelVersionMetasByModelId")
+  List<ModelVersionPO> listModelVersionMetasByModelId(@Param("modelId") Long 
modelId);
+
+  @SelectProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "selectModelVersionMeta")
+  ModelVersionPO selectModelVersionMeta(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion);
+
+  @SelectProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "selectModelVersionMetaByAlias")
+  ModelVersionPO selectModelVersionMetaByAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias);
+
+  @UpdateProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "softDeleteModelVersionsBySchemaIdAndModelName")
+  Integer softDeleteModelVersionsBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName);
+
+  @UpdateProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "softDeleteModelVersionMetaByModelIdAndVersion")
+  Integer softDeleteModelVersionMetaByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion);
+
+  @UpdateProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "softDeleteModelVersionMetaByModelIdAndAlias")
+  Integer softDeleteModelVersionMetaByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias);
+
+  @UpdateProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "softDeleteModelVersionMetasBySchemaId")
+  Integer softDeleteModelVersionMetasBySchemaId(@Param("schemaId") Long 
schemaId);
+
+  @UpdateProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "softDeleteModelVersionMetasByCatalogId")
+  Integer softDeleteModelVersionMetasByCatalogId(@Param("catalogId") Long 
catalogId);
+
+  @UpdateProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "softDeleteModelVersionMetasByMetalakeId")
+  Integer softDeleteModelVersionMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId);
+
+  @DeleteProvider(
+      type = ModelVersionMetaSQLProviderFactory.class,
+      method = "deleteModelVersionMetasByLegacyTimeline")
+  Integer deleteModelVersionMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionMetaSQLProviderFactory.java
new file mode 100644
index 000000000..1f830f355
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelVersionMetaSQLProviderFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.ModelVersionMetaBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.ModelVersionMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.ModelVersionPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class ModelVersionMetaSQLProviderFactory {
+
+  static class ModelVersionMetaMySQLProvider extends 
ModelVersionMetaBaseSQLProvider {}
+
+  static class ModelVersionMetaH2Provider extends 
ModelVersionMetaBaseSQLProvider {}
+
+  private static final Map<JDBCBackendType, ModelVersionMetaBaseSQLProvider>
+      MODEL_VERSION_META_SQL_PROVIDER_MAP =
+          ImmutableMap.of(
+              JDBCBackendType.MYSQL, new ModelVersionMetaMySQLProvider(),
+              JDBCBackendType.H2, new ModelVersionMetaH2Provider(),
+              JDBCBackendType.POSTGRESQL, new 
ModelVersionMetaPostgreSQLProvider());
+
+  public static ModelVersionMetaBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+    return MODEL_VERSION_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  public static String insertModelVersionMeta(
+      @Param("modelVersionMeta") ModelVersionPO modelVersionPO) {
+    return getProvider().insertModelVersionMeta(modelVersionPO);
+  }
+
+  public static String listModelVersionMetasByModelId(@Param("modelId") Long 
modelId) {
+    return getProvider().listModelVersionMetasByModelId(modelId);
+  }
+
+  public static String selectModelVersionMeta(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion) {
+    return getProvider().selectModelVersionMeta(modelId, modelVersion);
+  }
+
+  public static String selectModelVersionMetaByAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias) {
+    return getProvider().selectModelVersionMetaByAlias(modelId, alias);
+  }
+
+  public static String softDeleteModelVersionsBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return 
getProvider().softDeleteModelVersionsBySchemaIdAndModelName(schemaId, 
modelName);
+  }
+
+  public static String softDeleteModelVersionMetaByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion) {
+    return 
getProvider().softDeleteModelVersionMetaByModelIdAndVersion(modelId, 
modelVersion);
+  }
+
+  public static String softDeleteModelVersionMetaByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias) {
+    return getProvider().softDeleteModelVersionMetaByModelIdAndAlias(modelId, 
alias);
+  }
+
+  public static String 
softDeleteModelVersionMetasBySchemaId(@Param("schemaId") Long schemaId) {
+    return getProvider().softDeleteModelVersionMetasBySchemaId(schemaId);
+  }
+
+  public static String 
softDeleteModelVersionMetasByCatalogId(@Param("catalogId") Long catalogId) {
+    return getProvider().softDeleteModelVersionMetasByCatalogId(catalogId);
+  }
+
+  public static String softDeleteModelVersionMetasByMetalakeId(
+      @Param("metalakeId") Long metalakeId) {
+    return getProvider().softDeleteModelVersionMetasByMetalakeId(metalakeId);
+  }
+
+  public static String deleteModelVersionMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return 
getProvider().deleteModelVersionMetasByLegacyTimeline(legacyTimeline, limit);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
index cae5b2d9d..0a78de9d0 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
@@ -134,4 +134,11 @@ public class ModelMetaBaseSQLProvider {
         + ModelMetaMapper.TABLE_NAME
         + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
   }
+
+  public String updateModelLatestVersion(@Param("modelId") Long modelId) {
+    return "UPDATE "
+        + ModelMetaMapper.TABLE_NAME
+        + " SET model_latest_version = model_latest_version + 1"
+        + " WHERE model_id = #{modelId} AND deleted_at = 0";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelVersionAliasRelBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelVersionAliasRelBaseSQLProvider.java
new file mode 100644
index 000000000..5354b888f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelVersionAliasRelBaseSQLProvider.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
+import org.apache.gravitino.storage.relational.po.ModelVersionAliasRelPO;
+import org.apache.ibatis.annotations.Param;
+
+public class ModelVersionAliasRelBaseSQLProvider {
+
+  public String insertModelVersionAliasRels(
+      @Param("modelVersionAliasRel") List<ModelVersionAliasRelPO> 
modelVersionAliasRelPOs) {
+    return "<script>"
+        + "INSERT INTO "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " (model_id, model_version, model_version_alias, deleted_at)"
+        + " VALUES "
+        + " <foreach collection='modelVersionAliasRel' item='item' 
separator=','>"
+        + " (#{item.modelId},"
+        + " (SELECT model_latest_version FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE model_id = #{item.modelId} AND deleted_at = 0),"
+        + " #{item.modelVersionAlias},"
+        + " #{item.deletedAt})"
+        + " </foreach>"
+        + "</script>";
+  }
+
+  public String selectModelVersionAliasRelsByModelId(@Param("modelId") Long 
modelId) {
+    return "SELECT model_id AS modelId, model_version AS modelVersion,"
+        + " model_version_alias AS modelVersionAlias, deleted_at AS deletedAt"
+        + " FROM "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND deleted_at = 0";
+  }
+
+  public String selectModelVersionAliasRelsByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion) {
+    return "SELECT model_id AS modelId, model_version AS modelVersion,"
+        + " model_version_alias AS modelVersionAlias, deleted_at AS deletedAt"
+        + " FROM "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND model_version = #{modelVersion} 
AND deleted_at = 0";
+  }
+
+  public String selectModelVersionAliasRelsByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias) {
+    return "SELECT model_id AS modelId, model_version AS modelVersion,"
+        + " model_version_alias AS modelVersionAlias, deleted_at AS deletedAt"
+        + " FROM "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND model_version = ("
+        + " SELECT model_version FROM "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND model_version_alias = #{alias} AND 
deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  public String softDeleteModelVersionAliasRelsBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " mvar SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE mvar.model_id = ("
+        + " SELECT mm.model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " mm WHERE mm.schema_id = #{schemaId} AND mm.model_name = 
#{modelName}"
+        + " AND mm.deleted_at = 0) AND mvar.deleted_at = 0";
+  }
+
+  public String softDeleteModelVersionAliasRelsByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE model_id = #{modelId} AND model_version = #{modelVersion} 
AND deleted_at = 0";
+  }
+
+  public String softDeleteModelVersionAliasRelsByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE model_id = #{modelId} AND model_version = ("
+        + " SELECT model_version FROM "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND model_version_alias = #{alias} AND 
deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  public String softDeleteModelVersionAliasRelsBySchemaId(@Param("schemaId") 
Long schemaId) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE model_id IN ("
+        + " SELECT model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0) AND deleted_at = 
0";
+  }
+
+  public String softDeleteModelVersionAliasRelsByCatalogId(@Param("catalogId") 
Long catalogId) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE model_id IN ("
+        + " SELECT model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0) AND deleted_at 
= 0";
+  }
+
+  public String 
softDeleteModelVersionAliasRelsByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE model_id IN ("
+        + " SELECT model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0) AND 
deleted_at = 0";
+  }
+
+  public String deleteModelVersionAliasRelsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelVersionMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelVersionMetaBaseSQLProvider.java
new file mode 100644
index 000000000..a43f114b0
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelVersionMetaBaseSQLProvider.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
+import org.apache.gravitino.storage.relational.po.ModelVersionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class ModelVersionMetaBaseSQLProvider {
+
+  public String insertModelVersionMeta(@Param("modelVersionMeta") 
ModelVersionPO modelVersionPO) {
+    return "INSERT INTO "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + "(metalake_id, catalog_id, schema_id, model_id, version,"
+        + " model_version_comment, model_version_properties, 
model_version_uri,"
+        + " audit_info, deleted_at)"
+        + " SELECT metalake_id, catalog_id, schema_id, model_id, 
model_latest_version,"
+        + " #{modelVersionMeta.modelVersionComment}, 
#{modelVersionMeta.modelVersionProperties},"
+        + " #{modelVersionMeta.modelVersionUri}, 
#{modelVersionMeta.auditInfo},"
+        + " #{modelVersionMeta.deletedAt}"
+        + " FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE model_id = #{modelVersionMeta.modelId} AND deleted_at = 0";
+  }
+
+  public String listModelVersionMetasByModelId(@Param("modelId") Long modelId) 
{
+    return "SELECT metalake_id AS metalakeId, catalog_id AS catalogId,"
+        + " schema_id AS schemaId, model_id AS modelId, version AS 
modelVersion,"
+        + " model_version_comment AS modelVersionComment, 
model_version_properties AS"
+        + " modelVersionProperties, model_version_uri AS modelVersionUri, 
audit_info AS"
+        + " auditInfo, deleted_at AS deletedAt"
+        + " FROM "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND deleted_at = 0";
+  }
+
+  public String selectModelVersionMeta(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion) {
+    return "SELECT metalake_id AS metalakeId, catalog_id AS catalogId,"
+        + " schema_id AS schemaId, model_id AS modelId, version AS 
modelVersion,"
+        + " model_version_comment AS modelVersionComment, 
model_version_properties AS"
+        + " modelVersionProperties, model_version_uri AS modelVersionUri, 
audit_info AS"
+        + " auditInfo, deleted_at AS deletedAt"
+        + " FROM "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND version = #{modelVersion} AND 
deleted_at = 0";
+  }
+
+  public String selectModelVersionMetaByAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias) {
+    return "SELECT mvi.metalake_id AS metalakeId, mvi.catalog_id AS catalogId,"
+        + " mvi.schema_id AS schemaId, mvi.model_id AS modelId, mvi.version AS 
modelVersion,"
+        + " mvi.model_version_comment AS modelVersionComment, 
mvi.model_version_properties AS"
+        + " modelVersionProperties, mvi.model_version_uri AS modelVersionUri, 
mvi.audit_info AS"
+        + " auditInfo, mvi.deleted_at AS deletedAt"
+        + " FROM "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " mvi"
+        + " JOIN "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " mvar"
+        + " ON mvi.model_id = mvar.model_id AND mvi.version = 
mvar.model_version"
+        + " WHERE mvi.model_id = #{modelId} AND mvar.model_version_alias = 
#{alias}"
+        + " AND mvi.deleted_at = 0 AND mvar.deleted_at = 0";
+  }
+
+  public String softDeleteModelVersionsBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " mvi SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE mvi.schema_id = #{schemaId} AND mvi.model_id = ("
+        + " SELECT mm.model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " mm WHERE mm.schema_id = #{schemaId} AND mm.model_name = 
#{modelName}"
+        + " AND mm.deleted_at = 0) AND mvi.deleted_at = 0";
+  }
+
+  public String softDeleteModelVersionMetaByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE model_id = #{modelId} AND version = #{modelVersion} AND 
deleted_at = 0";
+  }
+
+  public String softDeleteModelVersionMetaByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE model_id = #{modelId} AND version = ("
+        + " SELECT model_version FROM "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND model_version_alias = #{alias} AND 
deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  public String softDeleteModelVersionMetasBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  public String softDeleteModelVersionMetasByCatalogId(@Param("catalogId") 
Long catalogId) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  public String softDeleteModelVersionMetasByMetalakeId(@Param("metalakeId") 
Long metalakeId) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  public String deleteModelVersionMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/ModelVersionAliasRelPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/ModelVersionAliasRelPostgreSQLProvider.java
new file mode 100644
index 000000000..a37f05312
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/ModelVersionAliasRelPostgreSQLProvider.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.ModelVersionAliasRelBaseSQLProvider;
+import org.apache.ibatis.annotations.Param;
+
+public class ModelVersionAliasRelPostgreSQLProvider extends 
ModelVersionAliasRelBaseSQLProvider {
+
+  @Override
+  public String softDeleteModelVersionAliasRelsBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " mvar SET deleted_at = floor(extract(epoch from((current_timestamp 
-"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE mvar.model_id = ("
+        + " SELECT mm.model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " mm WHERE mm.schema_id = #{schemaId} AND mm.model_name = 
#{modelName}"
+        + " AND mm.deleted_at = 0) AND mvar.deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelVersionAliasRelsByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE model_id = #{modelId} AND model_version = #{version} AND 
deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelVersionAliasRelsByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE model_id = #{modelId} AND model_version = ("
+        + " SELECT model_version FROM "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND model_version_alias = #{alias} AND 
deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelVersionAliasRelsBySchemaId(@Param("schemaId") 
Long schemaId) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE model_id IN ("
+        + " SELECT model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0) AND deleted_at = 
0";
+  }
+
+  @Override
+  public String softDeleteModelVersionAliasRelsByCatalogId(@Param("catalogId") 
Long catalogId) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE model_id IN ("
+        + " SELECT model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0) AND deleted_at 
= 0";
+  }
+
+  @Override
+  public String 
softDeleteModelVersionAliasRelsByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE model_id IN ("
+        + " SELECT model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0) AND 
deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/ModelVersionMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/ModelVersionMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..09be14319
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/ModelVersionMetaPostgreSQLProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.ModelVersionMetaBaseSQLProvider;
+import org.apache.ibatis.annotations.Param;
+
+public class ModelVersionMetaPostgreSQLProvider extends 
ModelVersionMetaBaseSQLProvider {
+
+  @Override
+  public String softDeleteModelVersionsBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " mvi SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE mvi.schema_id = #{schemaId} AND mvi.model_id = ("
+        + " SELECT mm.model_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " mm WHERE mm.schema_id = #{schemaId} AND mm.model_name = 
#{modelName}"
+        + " AND mm.deleted_at = 0) AND mvi.deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelVersionMetaByModelIdAndVersion(
+      @Param("modelId") Long modelId, @Param("modelVersion") Integer 
modelVersion) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE model_id = #{modelId} AND version = #{version} AND 
deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelVersionMetaByModelIdAndAlias(
+      @Param("modelId") Long modelId, @Param("alias") String alias) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE model_id = #{modelId} AND version = ("
+        + " SELECT model_version FROM "
+        + ModelVersionAliasRelMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND model_version_alias = #{alias} AND 
deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelVersionMetasBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelVersionMetasByCatalogId(@Param("catalogId") 
Long catalogId) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelVersionMetasByMetalakeId(@Param("metalakeId") 
Long metalakeId) {
+    return "UPDATE "
+        + ModelVersionMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionAliasRelPO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionAliasRelPO.java
index fc7896b25..e09e35f1d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionAliasRelPO.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionAliasRelPO.java
@@ -31,7 +31,7 @@ public class ModelVersionAliasRelPO {
 
   private Integer modelVersion;
 
-  private String modelAlias;
+  private String modelVersionAlias;
 
   private Long deletedAt;
 
@@ -59,8 +59,8 @@ public class ModelVersionAliasRelPO {
       return this;
     }
 
-    public Builder withModelAlias(String modelAlias) {
-      modelVersionAliasRelPO.modelAlias = modelAlias;
+    public Builder withModelVersionAlias(String modelVersionAlias) {
+      modelVersionAliasRelPO.modelVersionAlias = modelVersionAlias;
       return this;
     }
 
@@ -74,7 +74,8 @@ public class ModelVersionAliasRelPO {
       Preconditions.checkArgument(
           modelVersionAliasRelPO.modelVersion != null, "modelVersion is 
required");
       Preconditions.checkArgument(
-          StringUtils.isNotBlank(modelVersionAliasRelPO.modelAlias), 
"modelAlias is required");
+          StringUtils.isNotBlank(modelVersionAliasRelPO.modelVersionAlias),
+          "modelVersionAlias is required");
       Preconditions.checkArgument(
           modelVersionAliasRelPO.deletedAt != null, "deletedAt is required");
       return modelVersionAliasRelPO;
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionPO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionPO.java
index ac5611e2d..59ca6271e 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionPO.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionPO.java
@@ -112,11 +112,6 @@ public class ModelVersionPO {
     }
 
     public ModelVersionPO build() {
-      Preconditions.checkArgument(modelVersionPO.modelId != null, "Model id is 
required");
-      Preconditions.checkArgument(modelVersionPO.metalakeId != null, "Metalake 
id is required");
-      Preconditions.checkArgument(modelVersionPO.catalogId != null, "Catalog 
id is required");
-      Preconditions.checkArgument(modelVersionPO.schemaId != null, "Schema id 
is required");
-      Preconditions.checkArgument(modelVersionPO.modelVersion != null, "Model 
version is required");
       Preconditions.checkArgument(
           StringUtils.isNotBlank(modelVersionPO.modelVersionUri),
           "Model version uri cannot be empty");
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
index 0dcf0280c..310b8cc08 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
@@ -37,6 +37,8 @@ import 
org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
@@ -242,6 +244,14 @@ public class CatalogMetaService {
               SessionUtils.doWithoutCommit(
                   TagMetadataObjectRelMapper.class,
                   mapper -> 
mapper.softDeleteTagMetadataObjectRelsByCatalogId(catalogId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  ModelVersionAliasRelMapper.class,
+                  mapper -> 
mapper.softDeleteModelVersionAliasRelsByCatalogId(catalogId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  ModelVersionMetaMapper.class,
+                  mapper -> 
mapper.softDeleteModelVersionMetasByCatalogId(catalogId)),
           () ->
               SessionUtils.doWithoutCommit(
                   ModelMetaMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
index 8fa94d4d7..75e217279 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -38,6 +38,8 @@ import 
org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.RoleMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
@@ -245,6 +247,14 @@ public class MetalakeMetaService {
                 SessionUtils.doWithoutCommit(
                     OwnerMetaMapper.class,
                     mapper -> 
mapper.softDeleteOwnerRelByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    ModelVersionAliasRelMapper.class,
+                    mapper -> 
mapper.softDeleteModelVersionAliasRelsByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    ModelVersionMetaMapper.class,
+                    mapper -> 
mapper.softDeleteModelVersionMetasByMetalakeId(metalakeId)),
             () ->
                 SessionUtils.doWithoutCommit(
                     ModelMetaMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
index 2cb16bd07..2da43755c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
@@ -30,6 +30,8 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
 import org.apache.gravitino.storage.relational.po.ModelPO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
 import org.apache.gravitino.storage.relational.utils.POConverters;
@@ -64,22 +66,7 @@ public class ModelMetaService {
   }
 
   public ModelEntity getModelByIdentifier(NameIdentifier ident) {
-    NameIdentifierUtil.checkModel(ident);
-
-    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(ident.namespace());
-
-    ModelPO modelPO =
-        SessionUtils.getWithoutCommit(
-            ModelMetaMapper.class,
-            mapper -> mapper.selectModelMetaBySchemaIdAndModelName(schemaId, 
ident.name()));
-
-    if (modelPO == null) {
-      throw new NoSuchEntityException(
-          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
-          Entity.EntityType.MODEL.name().toLowerCase(Locale.ROOT),
-          ident.toString());
-    }
-
+    ModelPO modelPO = getModelPOByIdentifier(ident);
     return POConverters.fromModelPO(modelPO, ident.namespace());
   }
 
@@ -120,14 +107,28 @@ public class ModelMetaService {
 
     AtomicInteger modelDeletedCount = new AtomicInteger();
     SessionUtils.doMultipleWithCommit(
+        // delete model versions first
+        () ->
+            SessionUtils.doWithoutCommit(
+                ModelVersionMetaMapper.class,
+                mapper ->
+                    
mapper.softDeleteModelVersionsBySchemaIdAndModelName(schemaId, ident.name())),
+
+        // delete model version aliases
+        () ->
+            SessionUtils.doWithoutCommit(
+                ModelVersionAliasRelMapper.class,
+                mapper ->
+                    
mapper.softDeleteModelVersionAliasRelsBySchemaIdAndModelName(
+                        schemaId, ident.name())),
+
+        // delete model meta
         () ->
             modelDeletedCount.set(
                 SessionUtils.doWithoutCommitAndFetchResult(
                     ModelMetaMapper.class,
                     mapper ->
-                        
mapper.softDeleteModelMetaBySchemaIdAndModelName(schemaId, ident.name())))
-        // TODO(jerryshao): Add delete model version
-        );
+                        
mapper.softDeleteModelMetaBySchemaIdAndModelName(schemaId, ident.name()))));
 
     return modelDeletedCount.get() > 0;
   }
@@ -186,4 +187,23 @@ public class ModelMetaService {
         
SchemaMetaService.getInstance().getSchemaIdByCatalogIdAndName(catalogId, 
schema);
     builder.withSchemaId(schemaId);
   }
+
+  ModelPO getModelPOByIdentifier(NameIdentifier ident) {
+    NameIdentifierUtil.checkModel(ident);
+
+    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(ident.namespace());
+
+    ModelPO modelPO =
+        SessionUtils.getWithoutCommit(
+            ModelMetaMapper.class,
+            mapper -> mapper.selectModelMetaBySchemaIdAndModelName(schemaId, 
ident.name()));
+
+    if (modelPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.MODEL.name().toLowerCase(Locale.ROOT),
+          ident.toString());
+    }
+    return modelPO;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelVersionMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelVersionMetaService.java
new file mode 100644
index 000000000..330a0b66e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelVersionMetaService.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
+import org.apache.gravitino.storage.relational.po.ModelVersionAliasRelPO;
+import org.apache.gravitino.storage.relational.po.ModelVersionPO;
+import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
+import org.apache.gravitino.storage.relational.utils.POConverters;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.glassfish.jersey.internal.guava.Lists;
+
+public class ModelVersionMetaService {
+
+  private static final ModelVersionMetaService INSTANCE = new 
ModelVersionMetaService();
+
+  public static ModelVersionMetaService getInstance() {
+    return INSTANCE;
+  }
+
+  private ModelVersionMetaService() {}
+
+  public List<ModelVersionEntity> listModelVersionsByNamespace(Namespace ns) {
+    NamespaceUtil.checkModelVersion(ns);
+
+    NameIdentifier modelIdent = NameIdentifier.of(ns.levels());
+    // Will throw a NoSuchEntityException if the model does not exist.
+    ModelEntity modelEntity = 
ModelMetaService.getInstance().getModelByIdentifier(modelIdent);
+
+    List<ModelVersionPO> modelVersionPOs =
+        SessionUtils.getWithoutCommit(
+            ModelVersionMetaMapper.class,
+            mapper -> mapper.listModelVersionMetasByModelId(modelEntity.id()));
+
+    if (modelVersionPOs.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    // Get the aliases for all the model versions.
+    List<ModelVersionAliasRelPO> aliasRelPOs =
+        SessionUtils.getWithoutCommit(
+            ModelVersionAliasRelMapper.class,
+            mapper -> 
mapper.selectModelVersionAliasRelsByModelId(modelEntity.id()));
+    Multimap<Integer, ModelVersionAliasRelPO> aliasRelPOsByModelVersion =
+        ArrayListMultimap.create();
+    aliasRelPOs.forEach(r -> 
aliasRelPOsByModelVersion.put(r.getModelVersion(), r));
+
+    return modelVersionPOs.stream()
+        .map(
+            m -> {
+              List<ModelVersionAliasRelPO> versionAliasRelPOs =
+                  
Lists.newArrayList(aliasRelPOsByModelVersion.get(m.getModelVersion()));
+              return POConverters.fromModelVersionPO(modelIdent, m, 
versionAliasRelPOs);
+            })
+        .collect(Collectors.toList());
+  }
+
+  public ModelVersionEntity getModelVersionByIdentifier(NameIdentifier ident) {
+    NameIdentifierUtil.checkModelVersion(ident);
+
+    NameIdentifier modelIdent = NameIdentifier.of(ident.namespace().levels());
+    // Will throw a NoSuchEntityException if the model does not exist.
+    ModelEntity modelEntity = 
ModelMetaService.getInstance().getModelByIdentifier(modelIdent);
+
+    boolean isVersionNumber = NumberUtils.isCreatable(ident.name());
+
+    ModelVersionPO modelVersionPO =
+        SessionUtils.getWithoutCommit(
+            ModelVersionMetaMapper.class,
+            mapper -> {
+              if (isVersionNumber) {
+                return mapper.selectModelVersionMeta(
+                    modelEntity.id(), Integer.valueOf(ident.name()));
+              } else {
+                return mapper.selectModelVersionMetaByAlias(modelEntity.id(), 
ident.name());
+              }
+            });
+
+    if (modelVersionPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.MODEL_VERSION.name().toLowerCase(Locale.ROOT),
+          ident.toString());
+    }
+
+    List<ModelVersionAliasRelPO> aliasRelPOs =
+        SessionUtils.getWithoutCommit(
+            ModelVersionAliasRelMapper.class,
+            mapper -> {
+              if (isVersionNumber) {
+                return mapper.selectModelVersionAliasRelsByModelIdAndVersion(
+                    modelEntity.id(), Integer.valueOf(ident.name()));
+              } else {
+                return mapper.selectModelVersionAliasRelsByModelIdAndAlias(
+                    modelEntity.id(), ident.name());
+              }
+            });
+
+    return POConverters.fromModelVersionPO(modelIdent, modelVersionPO, 
aliasRelPOs);
+  }
+
+  public void insertModelVersion(ModelVersionEntity modelVersionEntity) throws 
IOException {
+    NameIdentifier modelIdent = modelVersionEntity.modelIdentifier();
+    NameIdentifierUtil.checkModel(modelIdent);
+
+    Long schemaId =
+        
CommonMetaService.getInstance().getParentEntityIdByNamespace(modelIdent.namespace());
+    Long modelId =
+        ModelMetaService.getInstance()
+            .getModelIdBySchemaIdAndModelName(schemaId, modelIdent.name());
+
+    ModelVersionPO.Builder builder = 
ModelVersionPO.builder().withModelId(modelId);
+    ModelVersionPO modelVersionPO =
+        POConverters.initializeModelVersionPO(modelVersionEntity, builder);
+    List<ModelVersionAliasRelPO> aliasRelPOs =
+        POConverters.initializeModelVersionAliasRelPO(modelVersionEntity, 
modelId);
+
+    try {
+      SessionUtils.doMultipleWithCommit(
+          () ->
+              SessionUtils.doWithoutCommit(
+                  ModelVersionMetaMapper.class,
+                  mapper -> mapper.insertModelVersionMeta(modelVersionPO)),
+          () -> {
+            if (aliasRelPOs.isEmpty()) {
+              return;
+            }
+            SessionUtils.doWithoutCommit(
+                ModelVersionAliasRelMapper.class,
+                mapper -> mapper.insertModelVersionAliasRels(aliasRelPOs));
+          },
+          () ->
+              // If the model version is inserted successfully, update the 
model latest version.
+              SessionUtils.doWithoutCommit(
+                  ModelMetaMapper.class, mapper -> 
mapper.updateModelLatestVersion(modelId)));
+    } catch (RuntimeException re) {
+      ExceptionUtils.checkSQLException(
+          re, Entity.EntityType.MODEL_VERSION, 
modelVersionEntity.modelIdentifier().toString());
+      throw re;
+    }
+  }
+
+  public boolean deleteModelVersion(NameIdentifier ident) {
+    NameIdentifierUtil.checkModelVersion(ident);
+
+    NameIdentifier modelIdent = NameIdentifier.of(ident.namespace().levels());
+    // Will throw a NoSuchEntityException if the model does not exist.
+    ModelEntity modelEntity;
+    try {
+      modelEntity = 
ModelMetaService.getInstance().getModelByIdentifier(modelIdent);
+    } catch (NoSuchEntityException e) {
+      return false;
+    }
+
+    boolean isVersionNumber = NumberUtils.isCreatable(ident.name());
+
+    AtomicInteger modelVersionDeletedCount = new AtomicInteger();
+    SessionUtils.doMultipleWithCommit(
+        // Delete model version relations first
+        () ->
+            modelVersionDeletedCount.set(
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    ModelVersionMetaMapper.class,
+                    mapper -> {
+                      if (isVersionNumber) {
+                        return 
mapper.softDeleteModelVersionMetaByModelIdAndVersion(
+                            modelEntity.id(), Integer.valueOf(ident.name()));
+                      } else {
+                        return 
mapper.softDeleteModelVersionMetaByModelIdAndAlias(
+                            modelEntity.id(), ident.name());
+                      }
+                    })),
+        () -> {
+          // Delete model version alias relations
+          if (modelVersionDeletedCount.get() == 0) {
+            return;
+          }
+
+          SessionUtils.doWithoutCommit(
+              ModelVersionAliasRelMapper.class,
+              mapper -> {
+                if (isVersionNumber) {
+                  mapper.softDeleteModelVersionAliasRelsByModelIdAndVersion(
+                      modelEntity.id(), Integer.valueOf(ident.name()));
+                } else {
+                  mapper.softDeleteModelVersionAliasRelsByModelIdAndAlias(
+                      modelEntity.id(), ident.name());
+                }
+              });
+        });
+
+    return modelVersionDeletedCount.get() > 0;
+  }
+
+  public int deleteModelVersionMetasByLegacyTimeline(Long legacyTimeline, int 
limit) {
+    int[] modelVersionDeletedCount = new int[] {0};
+    int[] modelVersionAliasRelDeletedCount = new int[] {0};
+
+    SessionUtils.doMultipleWithCommit(
+        () ->
+            modelVersionDeletedCount[0] =
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    ModelVersionMetaMapper.class,
+                    mapper ->
+                        
mapper.deleteModelVersionMetasByLegacyTimeline(legacyTimeline, limit)),
+        () ->
+            modelVersionAliasRelDeletedCount[0] =
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    ModelVersionAliasRelMapper.class,
+                    mapper ->
+                        
mapper.deleteModelVersionAliasRelsByLegacyTimeline(legacyTimeline, limit)));
+
+    return modelVersionDeletedCount[0] + modelVersionAliasRelDeletedCount[0];
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index 1229e3165..4c9c828cb 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -37,6 +37,8 @@ import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
@@ -229,6 +231,14 @@ public class SchemaMetaService {
                 SessionUtils.doWithoutCommit(
                     TagMetadataObjectRelMapper.class,
                     mapper -> 
mapper.softDeleteTagMetadataObjectRelsBySchemaId(schemaId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    ModelVersionAliasRelMapper.class,
+                    mapper -> 
mapper.softDeleteModelVersionAliasRelsBySchemaId(schemaId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    ModelVersionMetaMapper.class,
+                    mapper -> 
mapper.softDeleteModelVersionMetasBySchemaId(schemaId)),
             () ->
                 SessionUtils.doWithoutCommit(
                     ModelMetaMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
index 8bc7394d4..82769be1f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
@@ -34,6 +34,8 @@ import 
org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.RoleMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
@@ -126,6 +128,8 @@ public class SqlSessionFactoryHelper {
     configuration.addMapper(TagMetadataObjectRelMapper.class);
     configuration.addMapper(OwnerMetaMapper.class);
     configuration.addMapper(ModelMetaMapper.class);
+    configuration.addMapper(ModelVersionMetaMapper.class);
+    configuration.addMapper(ModelVersionAliasRelMapper.class);
 
     // Create the SqlSessionFactory object, it is a singleton object
     if (sqlSessionFactory == null) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 0bd0f4a74..12737cd07 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -30,6 +30,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.Privileges;
@@ -46,6 +47,7 @@ import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
@@ -66,6 +68,8 @@ import org.apache.gravitino.storage.relational.po.GroupPO;
 import org.apache.gravitino.storage.relational.po.GroupRoleRelPO;
 import org.apache.gravitino.storage.relational.po.MetalakePO;
 import org.apache.gravitino.storage.relational.po.ModelPO;
+import org.apache.gravitino.storage.relational.po.ModelVersionAliasRelPO;
+import org.apache.gravitino.storage.relational.po.ModelVersionPO;
 import org.apache.gravitino.storage.relational.po.OwnerRelPO;
 import org.apache.gravitino.storage.relational.po.RolePO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
@@ -1324,4 +1328,68 @@ public class POConverters {
       throw new RuntimeException("Failed to serialize json object:", e);
     }
   }
+
+  public static ModelVersionEntity fromModelVersionPO(
+      NameIdentifier modelIdent,
+      ModelVersionPO modelVersionPO,
+      List<ModelVersionAliasRelPO> aliasRelPOs) {
+    List<String> aliases =
+        aliasRelPOs.stream()
+            .map(ModelVersionAliasRelPO::getModelVersionAlias)
+            .collect(Collectors.toList());
+
+    try {
+      return ModelVersionEntity.builder()
+          .withModelIdentifier(modelIdent)
+          .withVersion(modelVersionPO.getModelVersion())
+          .withAliases(aliases)
+          .withComment(modelVersionPO.getModelVersionComment())
+          .withUri(modelVersionPO.getModelVersionUri())
+          .withProperties(
+              JsonUtils.anyFieldMapper()
+                  .readValue(modelVersionPO.getModelVersionProperties(), 
Map.class))
+          .withAuditInfo(
+              
JsonUtils.anyFieldMapper().readValue(modelVersionPO.getAuditInfo(), 
AuditInfo.class))
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to deserialize json object:", e);
+    }
+  }
+
+  public static ModelVersionPO initializeModelVersionPO(
+      ModelVersionEntity modelVersionEntity, ModelVersionPO.Builder builder) {
+    try {
+      return builder
+          // Note that version set here will not be used when inserting into 
database, it will
+          // directly use the version from the query to avoid concurrent 
version conflict.
+          .withModelVersion(modelVersionEntity.version())
+          .withModelVersionComment(modelVersionEntity.comment())
+          .withModelVersionUri(modelVersionEntity.uri())
+          .withModelVersionProperties(
+              
JsonUtils.anyFieldMapper().writeValueAsString(modelVersionEntity.properties()))
+          .withAuditInfo(
+              
JsonUtils.anyFieldMapper().writeValueAsString(modelVersionEntity.auditInfo()))
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
+    }
+  }
+
+  public static List<ModelVersionAliasRelPO> initializeModelVersionAliasRelPO(
+      ModelVersionEntity modelVersionEntity, Long modelId) {
+    return modelVersionEntity.aliases().stream()
+        .map(
+            a ->
+                ModelVersionAliasRelPO.builder()
+                    // Note that version set here will not be used when 
inserting into database, it
+                    // will directly use the version from the query to avoid 
concurrent version
+                    // conflict.
+                    .withModelVersion(modelVersionEntity.version())
+                    .withModelVersionAlias(a)
+                    .withModelId(modelId)
+                    .withDeletedAt(DEFAULT_DELETED_AT)
+                    .build())
+        .collect(Collectors.toList());
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/meta/TestModelVersionEntity.java 
b/core/src/test/java/org/apache/gravitino/meta/TestModelVersionEntity.java
index 5b62ea946..44ec19ab4 100644
--- a/core/src/test/java/org/apache/gravitino/meta/TestModelVersionEntity.java
+++ b/core/src/test/java/org/apache/gravitino/meta/TestModelVersionEntity.java
@@ -23,6 +23,8 @@ import com.google.common.collect.Lists;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -37,6 +39,7 @@ public class TestModelVersionEntity {
 
     ModelVersionEntity modelVersionEntity =
         ModelVersionEntity.builder()
+            .withModelIdentifier(NameIdentifier.of("m1", "c1", "s1", "model1"))
             .withVersion(1)
             .withComment("test comment")
             .withAliases(aliases)
@@ -45,15 +48,22 @@ public class TestModelVersionEntity {
             .withAuditInfo(auditInfo)
             .build();
 
+    Assertions.assertEquals(
+        NameIdentifier.of("m1", "c1", "s1", "model1"), 
modelVersionEntity.modelIdentifier());
     Assertions.assertEquals(1, modelVersionEntity.version());
     Assertions.assertEquals("test comment", modelVersionEntity.comment());
     Assertions.assertEquals(aliases, modelVersionEntity.aliases());
     Assertions.assertEquals(properties, modelVersionEntity.properties());
     Assertions.assertEquals("test_uri", modelVersionEntity.uri());
     Assertions.assertEquals(auditInfo, modelVersionEntity.auditInfo());
+    Assertions.assertEquals(
+        Namespace.of("m1", "c1", "s1", "model1"), 
modelVersionEntity.namespace());
+    Assertions.assertEquals("1", modelVersionEntity.name());
+    Assertions.assertThrows(UnsupportedOperationException.class, 
modelVersionEntity::id);
 
     ModelVersionEntity modelVersionEntity2 =
         ModelVersionEntity.builder()
+            .withModelIdentifier(NameIdentifier.of("m1", "c1", "s1", "model1"))
             .withVersion(1)
             .withAliases(aliases)
             .withProperties(properties)
@@ -64,6 +74,7 @@ public class TestModelVersionEntity {
 
     ModelVersionEntity modelVersionEntity3 =
         ModelVersionEntity.builder()
+            .withModelIdentifier(NameIdentifier.of("m1", "c1", "s1", "model1"))
             .withVersion(1)
             .withComment("test comment")
             .withAliases(aliases)
@@ -74,13 +85,14 @@ public class TestModelVersionEntity {
 
     ModelVersionEntity modelVersionEntity4 =
         ModelVersionEntity.builder()
+            .withModelIdentifier(NameIdentifier.of("m1", "c1", "s1", "model1"))
             .withVersion(1)
             .withComment("test comment")
             .withProperties(properties)
             .withUri("test_uri")
             .withAuditInfo(auditInfo)
             .build();
-    Assertions.assertNull(modelVersionEntity4.aliases());
+    Assertions.assertTrue(modelVersionEntity4.aliases().isEmpty());
   }
 
   @Test
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java 
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
index fcc6ebd9b..a85f89628 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
@@ -32,6 +32,7 @@ import static 
org.apache.gravitino.Configs.STORE_DELETE_AFTER_TIME;
 import static org.apache.gravitino.Configs.VERSION_RETENTION_COUNT;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.File;
@@ -73,6 +74,7 @@ import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
@@ -270,9 +272,19 @@ public class TestEntityStorage {
               Namespace.of("metalake", "catalog", "schema1"),
               "model1",
               "model1",
-              1,
+              0,
+              null,
+              auditInfo);
+      ModelVersionEntity modelVersion1 =
+          TestJDBCBackend.createModelVersionEntity(
+              model1.nameIdentifier(),
+              0,
+              "model_path",
+              ImmutableList.of("alias1", "alias2"),
+              null,
               null,
               auditInfo);
+
       UserEntity user1 =
           createUser(RandomIdGenerator.INSTANCE.nextId(), "metalake", "user1", 
auditInfo);
       GroupEntity group1 =
@@ -289,6 +301,7 @@ public class TestEntityStorage {
       store.put(fileset1);
       store.put(topic1);
       store.put(model1);
+      store.put(modelVersion1);
       store.put(user1);
       store.put(group1);
       store.put(role1);
@@ -333,6 +346,24 @@ public class TestEntityStorage {
                   NameIdentifier.of("metalake", "catalog", "schema1", 
"model1"),
                   Entity.EntityType.MODEL,
                   ModelEntity.class));
+      Assertions.assertDoesNotThrow(
+          () ->
+              store.get(
+                  NameIdentifier.of("metalake", "catalog", "schema1", 
"model1", "0"),
+                  Entity.EntityType.MODEL_VERSION,
+                  ModelVersionEntity.class));
+      Assertions.assertDoesNotThrow(
+          () ->
+              store.get(
+                  NameIdentifier.of("metalake", "catalog", "schema1", 
"model1", "alias1"),
+                  EntityType.MODEL_VERSION,
+                  ModelVersionEntity.class));
+      Assertions.assertDoesNotThrow(
+          () ->
+              store.get(
+                  NameIdentifier.of("metalake", "catalog", "schema1", 
"model1", "alias2"),
+                  EntityType.MODEL_VERSION,
+                  ModelVersionEntity.class));
 
       Assertions.assertDoesNotThrow(
           () ->
@@ -400,6 +431,24 @@ public class TestEntityStorage {
                   NameIdentifier.of("metalake", "catalog", "schema1", 
"model1"),
                   Entity.EntityType.MODEL,
                   ModelEntity.class));
+      Assertions.assertDoesNotThrow(
+          () ->
+              store.get(
+                  NameIdentifier.of("metalake", "catalog", "schema1", 
"model1", "0"),
+                  Entity.EntityType.MODEL_VERSION,
+                  ModelVersionEntity.class));
+      Assertions.assertDoesNotThrow(
+          () ->
+              store.get(
+                  NameIdentifier.of("metalake", "catalog", "schema1", 
"model1", "alias1"),
+                  EntityType.MODEL_VERSION,
+                  ModelVersionEntity.class));
+      Assertions.assertDoesNotThrow(
+          () ->
+              store.get(
+                  NameIdentifier.of("metalake", "catalog", "schema1", 
"model1", "alias2"),
+                  EntityType.MODEL_VERSION,
+                  ModelVersionEntity.class));
 
       Assertions.assertDoesNotThrow(
           () ->
@@ -616,7 +665,16 @@ public class TestEntityStorage {
               Namespace.of("metalake", "catalog", "schema1"),
               "model1",
               "model1",
-              1,
+              0,
+              null,
+              auditInfo);
+      ModelVersionEntity modelVersion1 =
+          TestJDBCBackend.createModelVersionEntity(
+              model1.nameIdentifier(),
+              0,
+              "model_path",
+              ImmutableList.of("alias1", "alias2"),
+              null,
               null,
               auditInfo);
 
@@ -647,7 +705,16 @@ public class TestEntityStorage {
               Namespace.of("metalake", "catalog", "schema2"),
               "model1",
               "model1",
-              1,
+              0,
+              null,
+              auditInfo);
+      ModelVersionEntity modelVersion1InSchema2 =
+          TestJDBCBackend.createModelVersionEntity(
+              model1InSchema2.nameIdentifier(),
+              0,
+              "model_path",
+              ImmutableList.of("alias1", "alias2"),
+              null,
               null,
               auditInfo);
 
@@ -671,7 +738,9 @@ public class TestEntityStorage {
       store.put(topic1);
       store.put(topic1InSchema2);
       store.put(model1);
+      store.put(modelVersion1);
       store.put(model1InSchema2);
+      store.put(modelVersion1InSchema2);
       store.put(user1);
       store.put(user2);
       store.put(group1);
@@ -693,7 +762,9 @@ public class TestEntityStorage {
           topic1,
           topic1InSchema2,
           model1,
+          modelVersion1,
           model1InSchema2,
+          modelVersion1InSchema2,
           user1,
           user2,
           group1,
@@ -713,9 +784,10 @@ public class TestEntityStorage {
 
       validateDeleteTopic(store, schema2, topic1, topic1InSchema2);
 
-      validateDeleteModel(store, schema2, model1, model1InSchema2);
+      validateDeleteModel(
+          store, schema2, model1, modelVersion1, model1InSchema2, 
modelVersion1InSchema2);
 
-      validateDeleteSchema(store, schema1, table1, fileset1, topic1, model1);
+      validateDeleteSchema(store, schema1, table1, fileset1, topic1, model1, 
modelVersion1);
 
       validateDeleteCatalog(
           store,
@@ -1714,7 +1786,8 @@ public class TestEntityStorage {
       TableEntity table1,
       FilesetEntity fileset1,
       TopicEntity topic1,
-      ModelEntity model1)
+      ModelEntity model1,
+      ModelVersionEntity modelVersion1)
       throws IOException {
     // Delete the schema 'metalake.catalog.schema1' but failed, because it ha 
sub-entities;
     NonEmptyEntityException exception =
@@ -1731,6 +1804,8 @@ public class TestEntityStorage {
     Assertions.assertTrue(store.exists(fileset1.nameIdentifier(), 
Entity.EntityType.FILESET));
     Assertions.assertTrue(store.exists(topic1.nameIdentifier(), 
Entity.EntityType.TOPIC));
     Assertions.assertTrue(store.exists(model1.nameIdentifier(), 
Entity.EntityType.MODEL));
+    Assertions.assertTrue(
+        store.exists(modelVersion1.nameIdentifier(), 
Entity.EntityType.MODEL_VERSION));
 
     // Delete table1,fileset1 and schema1
     Assertions.assertTrue(store.delete(table1.nameIdentifier(), 
Entity.EntityType.TABLE));
@@ -1744,6 +1819,8 @@ public class TestEntityStorage {
     Assertions.assertFalse(store.exists(fileset1.nameIdentifier(), 
Entity.EntityType.FILESET));
     Assertions.assertFalse(store.exists(topic1.nameIdentifier(), 
Entity.EntityType.TOPIC));
     Assertions.assertFalse(store.exists(model1.nameIdentifier(), 
Entity.EntityType.MODEL));
+    Assertions.assertFalse(
+        store.exists(modelVersion1.nameIdentifier(), 
Entity.EntityType.MODEL_VERSION));
     Assertions.assertFalse(store.exists(schema1.nameIdentifier(), 
Entity.EntityType.SCHEMA));
 
     // Delete again should return false
@@ -1876,15 +1953,42 @@ public class TestEntityStorage {
   }
 
   private void validateDeleteModel(
-      EntityStore store, SchemaEntity schema2, ModelEntity model1, ModelEntity 
model1InSchema2)
+      EntityStore store,
+      SchemaEntity schema2,
+      ModelEntity model1,
+      ModelVersionEntity modelVersion1,
+      ModelEntity model1InSchema2,
+      ModelVersionEntity modelVersion1InSchema2)
       throws IOException {
     Assertions.assertTrue(store.delete(model1InSchema2.nameIdentifier(), 
Entity.EntityType.MODEL));
     Assertions.assertFalse(store.exists(model1InSchema2.nameIdentifier(), 
Entity.EntityType.MODEL));
     // delete again should return false
     Assertions.assertFalse(store.delete(model1InSchema2.nameIdentifier(), 
Entity.EntityType.MODEL));
 
+    Assertions.assertFalse(
+        store.exists(modelVersion1InSchema2.nameIdentifier(), 
EntityType.MODEL_VERSION));
+    Assertions.assertFalse(
+        store.delete(modelVersion1InSchema2.nameIdentifier(), 
EntityType.MODEL_VERSION));
+
+    ModelEntity model1Copy =
+        ModelEntity.builder()
+            .withId(model1.id())
+            .withNamespace(model1.namespace())
+            .withName(model1.name())
+            .withComment(model1.comment())
+            .withLatestVersion(model1.latestVersion() + 1)
+            .withProperties(model1.properties())
+            .withAuditInfo(model1.auditInfo())
+            .build();
+
+    Assertions.assertEquals(
+        model1Copy, store.get(model1.nameIdentifier(), 
Entity.EntityType.MODEL, ModelEntity.class));
+
     Assertions.assertEquals(
-        model1, store.get(model1.nameIdentifier(), Entity.EntityType.MODEL, 
ModelEntity.class));
+        modelVersion1,
+        store.get(
+            modelVersion1.nameIdentifier(), EntityType.MODEL_VERSION, 
ModelVersionEntity.class));
+
     // Make sure schema 'metalake.catalog.schema2' still exist;
     Assertions.assertEquals(
         schema2, store.get(schema2.nameIdentifier(), Entity.EntityType.SCHEMA, 
SchemaEntity.class));
@@ -1904,7 +2008,9 @@ public class TestEntityStorage {
       TopicEntity topic1,
       TopicEntity topic1InSchema2,
       ModelEntity model1,
+      ModelVersionEntity modelVersion1,
       ModelEntity model1InSchema2,
+      ModelVersionEntity modelVersion1InSchema2,
       UserEntity user1,
       UserEntity user2,
       GroupEntity group1,
@@ -1943,11 +2049,46 @@ public class TestEntityStorage {
     Assertions.assertEquals(
         topic1InSchema2,
         store.get(topic1InSchema2.nameIdentifier(), Entity.EntityType.TOPIC, 
TopicEntity.class));
+
+    ModelEntity model1Copy =
+        ModelEntity.builder()
+            .withId(model1.id())
+            .withNamespace(model1.namespace())
+            .withName(model1.name())
+            .withComment(model1.comment())
+            .withLatestVersion(model1.latestVersion() + 1)
+            .withProperties(model1.properties())
+            .withAuditInfo(model1.auditInfo())
+            .build();
+
     Assertions.assertEquals(
-        model1, store.get(model1.nameIdentifier(), Entity.EntityType.MODEL, 
ModelEntity.class));
+        model1Copy, store.get(model1.nameIdentifier(), 
Entity.EntityType.MODEL, ModelEntity.class));
     Assertions.assertEquals(
-        model1InSchema2,
-        store.get(model1InSchema2.nameIdentifier(), Entity.EntityType.MODEL, 
ModelEntity.class));
+        modelVersion1,
+        store.get(
+            modelVersion1.nameIdentifier(), EntityType.MODEL_VERSION, 
ModelVersionEntity.class));
+
+    ModelEntity model1InSchema2Copy =
+        ModelEntity.builder()
+            .withId(model1InSchema2.id())
+            .withNamespace(model1InSchema2.namespace())
+            .withName(model1InSchema2.name())
+            .withComment(model1InSchema2.comment())
+            .withLatestVersion(model1InSchema2.latestVersion() + 1)
+            .withProperties(model1InSchema2.properties())
+            .withAuditInfo(model1InSchema2.auditInfo())
+            .build();
+
+    Assertions.assertEquals(
+        model1InSchema2Copy,
+        store.get(model1InSchema2.nameIdentifier(), EntityType.MODEL, 
ModelEntity.class));
+    Assertions.assertEquals(
+        modelVersion1InSchema2,
+        store.get(
+            modelVersion1InSchema2.nameIdentifier(),
+            EntityType.MODEL_VERSION,
+            ModelVersionEntity.class));
+
     Assertions.assertEquals(
         user1, store.get(user1.nameIdentifier(), Entity.EntityType.USER, 
UserEntity.class));
     Assertions.assertEquals(
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index 739edba96..3c9339ff6 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -57,6 +57,7 @@ import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.Privileges;
@@ -69,6 +70,7 @@ import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
@@ -1392,6 +1394,25 @@ public class TestJDBCBackend {
         .build();
   }
 
+  public static ModelVersionEntity createModelVersionEntity(
+      NameIdentifier modelId,
+      Integer version,
+      String modelUri,
+      List<String> aliases,
+      String comment,
+      Map<String, String> properties,
+      AuditInfo auditInfo) {
+    return ModelVersionEntity.builder()
+        .withModelIdentifier(modelId)
+        .withVersion(version)
+        .withUri(modelUri)
+        .withAliases(aliases)
+        .withComment(comment)
+        .withProperties(properties)
+        .withAuditInfo(auditInfo)
+        .build();
+  }
+
   protected void createParentEntities(
       String metalakeName, String catalogName, String schemaName, AuditInfo 
auditInfo)
       throws IOException {
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestModelVersionMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestModelVersionMetaService.java
new file mode 100644
index 000000000..079714763
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestModelVersionMetaService.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.IllegalNamespaceException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NonEmptyEntityException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestModelVersionMetaService extends TestJDBCBackend {
+
+  private static final String METALAKE_NAME = 
"metalake_for_model_version_meta_test";
+
+  private static final String CATALOG_NAME = 
"catalog_for_model_version_meta_test";
+
+  private static final String SCHEMA_NAME = 
"schema_for_model_version_meta_test";
+
+  private static final Namespace MODEL_NS = Namespace.of(METALAKE_NAME, 
CATALOG_NAME, SCHEMA_NAME);
+
+  private final AuditInfo auditInfo =
+      
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+  private final Map<String, String> properties = ImmutableMap.of("k1", "v1");
+
+  private final List<String> aliases = Lists.newArrayList("alias1", "alias2");
+
+  @Test
+  public void testInsertAndSelectModelVersion() throws IOException {
+    createParentEntities(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, auditInfo);
+
+    // Create a model entity
+    ModelEntity modelEntity =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            MODEL_NS,
+            "model1",
+            "model1 comment",
+            0,
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> ModelMetaService.getInstance().insertModel(modelEntity, false));
+
+    // Create a model version entity
+    ModelVersionEntity modelVersionEntity =
+        createModelVersionEntity(
+            modelEntity.nameIdentifier(),
+            0,
+            "model_path",
+            aliases,
+            "test comment",
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> 
ModelVersionMetaService.getInstance().insertModelVersion(modelVersionEntity));
+
+    // Test if the model version can be retrieved by the identifier
+    Assertions.assertEquals(
+        modelVersionEntity,
+        ModelVersionMetaService.getInstance()
+            
.getModelVersionByIdentifier(getModelVersionIdent(modelEntity.nameIdentifier(), 
0)));
+
+    Assertions.assertEquals(
+        modelVersionEntity,
+        ModelVersionMetaService.getInstance()
+            .getModelVersionByIdentifier(
+                getModelVersionIdent(modelEntity.nameIdentifier(), "alias1")));
+
+    Assertions.assertEquals(
+        modelVersionEntity,
+        ModelVersionMetaService.getInstance()
+            .getModelVersionByIdentifier(
+                getModelVersionIdent(modelEntity.nameIdentifier(), "alias2")));
+
+    // Test insert again to get a new version number
+    ModelVersionEntity modelVersionEntity2 =
+        createModelVersionEntity(
+            modelEntity.nameIdentifier(), 1, "model_path", null, null, null, 
auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> 
ModelVersionMetaService.getInstance().insertModelVersion(modelVersionEntity2));
+
+    // Test if the new model version can be retrieved by the identifier
+    Assertions.assertEquals(
+        modelVersionEntity2,
+        ModelVersionMetaService.getInstance()
+            
.getModelVersionByIdentifier(getModelVersionIdent(modelEntity.nameIdentifier(), 
1)));
+
+    // Test if the old model version can still be retrieved by the identifier
+    Assertions.assertEquals(
+        modelVersionEntity,
+        ModelVersionMetaService.getInstance()
+            
.getModelVersionByIdentifier(getModelVersionIdent(modelEntity.nameIdentifier(), 
0)));
+
+    // Test if the old model version can still be retrieved by the alias
+    Assertions.assertEquals(
+        modelVersionEntity,
+        ModelVersionMetaService.getInstance()
+            .getModelVersionByIdentifier(
+                getModelVersionIdent(modelEntity.nameIdentifier(), "alias1")));
+
+    // Test fetch a non-exist model version
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            ModelVersionMetaService.getInstance()
+                .getModelVersionByIdentifier(
+                    getModelVersionIdent(modelEntity.nameIdentifier(), 2)));
+
+    // Test fetch a non-exist model alias
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            ModelVersionMetaService.getInstance()
+                .getModelVersionByIdentifier(
+                    getModelVersionIdent(modelEntity.nameIdentifier(), 
"alias3")));
+
+    // Test fetch from a non-exist model
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            ModelVersionMetaService.getInstance()
+                .getModelVersionByIdentifier(
+                    getModelVersionIdent(NameIdentifier.of(MODEL_NS, 
"model2"), 0)));
+
+    // Model latest version should be updated
+    ModelEntity registeredModelEntity =
+        
ModelMetaService.getInstance().getModelByIdentifier(modelEntity.nameIdentifier());
+    Assertions.assertEquals(2, registeredModelEntity.latestVersion());
+
+    // Test fetch from an invalid model version
+    Assertions.assertThrows(
+        IllegalNamespaceException.class,
+        () ->
+            ModelVersionMetaService.getInstance()
+                .getModelVersionByIdentifier(NameIdentifier.of(MODEL_NS, 
"model1")));
+
+    // Throw NoSuchEntityException if the model does not exist
+    ModelVersionEntity modelVersionEntity3 =
+        createModelVersionEntity(
+            NameIdentifier.of(MODEL_NS, "model2"),
+            1,
+            "model_path",
+            aliases,
+            "test comment",
+            properties,
+            auditInfo);
+
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () -> 
ModelVersionMetaService.getInstance().insertModelVersion(modelVersionEntity3));
+  }
+
+  @Test
+  public void testInsertAndListModelVersions() throws IOException {
+    createParentEntities(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, auditInfo);
+
+    // Create a model entity
+    ModelEntity modelEntity =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            MODEL_NS,
+            "model1",
+            "model1 comment",
+            0,
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> ModelMetaService.getInstance().insertModel(modelEntity, false));
+
+    // Create a model version entity
+    ModelVersionEntity modelVersionEntity =
+        createModelVersionEntity(
+            modelEntity.nameIdentifier(),
+            0,
+            "model_path",
+            aliases,
+            "test comment",
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> 
ModelVersionMetaService.getInstance().insertModelVersion(modelVersionEntity));
+
+    List<ModelVersionEntity> modelVersions =
+        ModelVersionMetaService.getInstance()
+            
.listModelVersionsByNamespace(getModelVersionNs(modelEntity.nameIdentifier()));
+    Assertions.assertEquals(1, modelVersions.size());
+    Assertions.assertEquals(modelVersionEntity, modelVersions.get(0));
+
+    // Test insert again to get a new version number
+    ModelVersionEntity modelVersionEntity2 =
+        createModelVersionEntity(
+            modelEntity.nameIdentifier(), 1, "model_path", null, null, null, 
auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> 
ModelVersionMetaService.getInstance().insertModelVersion(modelVersionEntity2));
+
+    List<ModelVersionEntity> modelVersions2 =
+        ModelVersionMetaService.getInstance()
+            
.listModelVersionsByNamespace(getModelVersionNs(modelEntity.nameIdentifier()));
+    Map<Integer, ModelVersionEntity> modelVersionMap =
+        
modelVersions2.stream().collect(Collectors.toMap(ModelVersionEntity::version, v 
-> v));
+    Assertions.assertEquals(2, modelVersions2.size());
+    Assertions.assertEquals(modelVersionEntity, modelVersionMap.get(0));
+    Assertions.assertEquals(modelVersionEntity2, modelVersionMap.get(1));
+
+    // List model versions from a non-exist model
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            ModelVersionMetaService.getInstance()
+                .listModelVersionsByNamespace(
+                    getModelVersionNs(NameIdentifier.of(MODEL_NS, "model2"))));
+  }
+
+  @Test
+  public void testInsertAndDeleteModelVersion() throws IOException {
+    createParentEntities(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, auditInfo);
+
+    // Create a model entity
+    ModelEntity modelEntity =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            MODEL_NS,
+            "model1",
+            "model1 comment",
+            0,
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> ModelMetaService.getInstance().insertModel(modelEntity, false));
+
+    // Create a model version entity
+    ModelVersionEntity modelVersionEntity =
+        createModelVersionEntity(
+            modelEntity.nameIdentifier(),
+            0,
+            "model_path",
+            aliases,
+            "test comment",
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> 
ModelVersionMetaService.getInstance().insertModelVersion(modelVersionEntity));
+
+    // Test using a non-exist model version to delete
+    Assertions.assertFalse(
+        ModelVersionMetaService.getInstance()
+            
.deleteModelVersion(getModelVersionIdent(modelEntity.nameIdentifier(), 100)));
+
+    // Test delete the model version
+    Assertions.assertTrue(
+        ModelVersionMetaService.getInstance()
+            
.deleteModelVersion(getModelVersionIdent(modelEntity.nameIdentifier(), 0)));
+
+    // Test fetch a non-exist model version
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            ModelVersionMetaService.getInstance()
+                .getModelVersionByIdentifier(
+                    getModelVersionIdent(modelEntity.nameIdentifier(), 0)));
+
+    // Test delete a non-exist model version
+    Assertions.assertFalse(
+        ModelVersionMetaService.getInstance()
+            
.deleteModelVersion(getModelVersionIdent(modelEntity.nameIdentifier(), 0)));
+
+    // Test delete a non-exist model version
+    Assertions.assertFalse(
+        ModelVersionMetaService.getInstance()
+            
.deleteModelVersion(getModelVersionIdent(modelEntity.nameIdentifier(), 1)));
+
+    // Test delete from a non-exist model
+    Assertions.assertFalse(
+        ModelVersionMetaService.getInstance()
+            
.deleteModelVersion(getModelVersionIdent(NameIdentifier.of(MODEL_NS, "model2"), 
0)));
+
+    // Test delete by alias
+    ModelVersionEntity modelVersionEntity2 =
+        createModelVersionEntity(
+            modelEntity.nameIdentifier(),
+            1,
+            "model_path",
+            aliases,
+            "test comment",
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> 
ModelVersionMetaService.getInstance().insertModelVersion(modelVersionEntity2));
+    ModelVersionEntity registeredModelVersionEntity =
+        ModelVersionMetaService.getInstance()
+            .getModelVersionByIdentifier(
+                getModelVersionIdent(modelEntity.nameIdentifier(), "alias1"));
+    Assertions.assertEquals(1, registeredModelVersionEntity.version());
+
+    ModelEntity registeredModelEntity =
+        
ModelMetaService.getInstance().getModelByIdentifier(modelEntity.nameIdentifier());
+    Assertions.assertEquals(2, registeredModelEntity.latestVersion());
+
+    // Test delete by a non-exist alias
+    Assertions.assertFalse(
+        ModelVersionMetaService.getInstance()
+            
.deleteModelVersion(getModelVersionIdent(modelEntity.nameIdentifier(), 
"alias3")));
+
+    // Test delete by an exist alias
+    Assertions.assertTrue(
+        ModelVersionMetaService.getInstance()
+            
.deleteModelVersion(getModelVersionIdent(modelEntity.nameIdentifier(), 
"alias1")));
+
+    // Test delete again by the same alias
+    Assertions.assertFalse(
+        ModelVersionMetaService.getInstance()
+            
.deleteModelVersion(getModelVersionIdent(modelEntity.nameIdentifier(), 
"alias1")));
+
+    // Test fetch a non-exist model version
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            ModelVersionMetaService.getInstance()
+                .getModelVersionByIdentifier(
+                    getModelVersionIdent(modelEntity.nameIdentifier(), 
"alias1")));
+
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            ModelVersionMetaService.getInstance()
+                .getModelVersionByIdentifier(
+                    getModelVersionIdent(modelEntity.nameIdentifier(), 
"alias2")));
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"model", "schema", "catalog", "metalake"})
+  public void testDeleteModelVersionsInDeletion(String input) throws 
IOException {
+    createParentEntities(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, auditInfo);
+
+    // Create a model entity
+    ModelEntity modelEntity =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            MODEL_NS,
+            "model1",
+            "model1 comment",
+            0,
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> ModelMetaService.getInstance().insertModel(modelEntity, false));
+
+    // Create a model version entity
+    ModelVersionEntity modelVersionEntity =
+        createModelVersionEntity(
+            modelEntity.nameIdentifier(),
+            0,
+            "model_path",
+            aliases,
+            "test comment",
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> 
ModelVersionMetaService.getInstance().insertModelVersion(modelVersionEntity));
+
+    ModelVersionEntity modelVersionEntity1 =
+        createModelVersionEntity(
+            modelEntity.nameIdentifier(), 1, "model_path", null, null, null, 
auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> 
ModelVersionMetaService.getInstance().insertModelVersion(modelVersionEntity1));
+
+    if (input.equals("model")) {
+      // Test delete the model
+      Assertions.assertTrue(
+          
ModelMetaService.getInstance().deleteModel(modelEntity.nameIdentifier()));
+
+    } else if (input.equals("schema")) {
+      NameIdentifier schemaIdent = NameIdentifier.of(METALAKE_NAME, 
CATALOG_NAME, SCHEMA_NAME);
+      Assertions.assertThrows(
+          NonEmptyEntityException.class,
+          () -> SchemaMetaService.getInstance().deleteSchema(schemaIdent, 
false));
+
+      // Test delete the schema with cascade
+      
Assertions.assertTrue(SchemaMetaService.getInstance().deleteSchema(schemaIdent, 
true));
+
+    } else if (input.equals("catalog")) {
+      NameIdentifier catalogIdent = NameIdentifier.of(METALAKE_NAME, 
CATALOG_NAME);
+      Assertions.assertThrows(
+          NonEmptyEntityException.class,
+          () -> CatalogMetaService.getInstance().deleteCatalog(catalogIdent, 
false));
+
+      // Test delete the catalog with cascade
+      
Assertions.assertTrue(CatalogMetaService.getInstance().deleteCatalog(catalogIdent,
 true));
+
+    } else if (input.equals("metalake")) {
+      NameIdentifier metalakeIdent = NameIdentifier.of(METALAKE_NAME);
+      Assertions.assertThrows(
+          NonEmptyEntityException.class,
+          () -> 
MetalakeMetaService.getInstance().deleteMetalake(metalakeIdent, false));
+
+      // Test delete the metalake with cascade
+      
Assertions.assertTrue(MetalakeMetaService.getInstance().deleteMetalake(metalakeIdent,
 true));
+    }
+
+    // Test fetch a non-exist model
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () -> 
ModelMetaService.getInstance().getModelByIdentifier(modelEntity.nameIdentifier()));
+
+    // Test list the model versions
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            ModelVersionMetaService.getInstance()
+                
.listModelVersionsByNamespace(getModelVersionNs(modelEntity.nameIdentifier())));
+
+    // Test fetch a non-exist model version
+    
verifyModelVersionExists(getModelVersionIdent(modelEntity.nameIdentifier(), 0));
+    
verifyModelVersionExists(getModelVersionIdent(modelEntity.nameIdentifier(), 1));
+    
verifyModelVersionExists(getModelVersionIdent(modelEntity.nameIdentifier(), 
"alias1"));
+    
verifyModelVersionExists(getModelVersionIdent(modelEntity.nameIdentifier(), 
"alias2"));
+  }
+
+  private NameIdentifier getModelVersionIdent(NameIdentifier modelIdent, int 
version) {
+    List<String> parts = Lists.newArrayList(modelIdent.namespace().levels());
+    parts.add(modelIdent.name());
+    parts.add(String.valueOf(version));
+    return NameIdentifier.of(parts.toArray(new String[0]));
+  }
+
+  private NameIdentifier getModelVersionIdent(NameIdentifier modelIdent, 
String alias) {
+    List<String> parts = Lists.newArrayList(modelIdent.namespace().levels());
+    parts.add(modelIdent.name());
+    parts.add(alias);
+    return NameIdentifier.of(parts.toArray(new String[0]));
+  }
+
+  private Namespace getModelVersionNs(NameIdentifier modelIdent) {
+    List<String> parts = Lists.newArrayList(modelIdent.namespace().levels());
+    parts.add(modelIdent.name());
+    return Namespace.of(parts.toArray(new String[0]));
+  }
+
+  private void verifyModelVersionExists(NameIdentifier ident) {
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () -> 
ModelVersionMetaService.getInstance().getModelVersionByIdentifier(ident));
+
+    
Assertions.assertFalse(ModelVersionMetaService.getInstance().deleteModelVersion(ident));
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
index 703b79e70..94fc50ed2 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
@@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.time.Instant;
@@ -34,9 +35,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.file.Fileset;
@@ -47,6 +50,7 @@ import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.meta.TableEntity;
@@ -62,13 +66,17 @@ import org.apache.gravitino.storage.relational.po.FilesetPO;
 import org.apache.gravitino.storage.relational.po.FilesetVersionPO;
 import org.apache.gravitino.storage.relational.po.MetalakePO;
 import org.apache.gravitino.storage.relational.po.ModelPO;
+import org.apache.gravitino.storage.relational.po.ModelVersionAliasRelPO;
+import org.apache.gravitino.storage.relational.po.ModelVersionPO;
 import org.apache.gravitino.storage.relational.po.OwnerRelPO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.gravitino.storage.relational.po.TablePO;
 import org.apache.gravitino.storage.relational.po.TagMetadataObjectRelPO;
 import org.apache.gravitino.storage.relational.po.TagPO;
 import org.apache.gravitino.storage.relational.po.TopicPO;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class TestPOConverters {
@@ -988,6 +996,172 @@ public class TestPOConverters {
     assertEquals(expectedModelWithEmptyProperties, 
convertedModelWithEmptyProperties);
   }
 
+  @Test
+  public void testInitModelVersionPO() throws JsonProcessingException {
+    NameIdentifier modelIdent = NameIdentifierUtil.ofModel("m", "c", "s", 
"model1");
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
+
+    ModelVersionEntity modelVersionEntity =
+        ModelVersionEntity.builder()
+            .withModelIdentifier(modelIdent)
+            .withVersion(1)
+            .withAliases(ImmutableList.of("alias1"))
+            .withComment("this is test")
+            .withProperties(ImmutableMap.of("key", "value"))
+            .withUri("hdfs://localhost/test")
+            .withAuditInfo(auditInfo)
+            .build();
+
+    ModelVersionPO.Builder builder1 =
+        ModelVersionPO.builder()
+            .withModelId(1L)
+            .withMetalakeId(1L)
+            .withCatalogId(1L)
+            .withSchemaId(1L);
+
+    ModelVersionPO modelVersionPO =
+        POConverters.initializeModelVersionPO(modelVersionEntity, builder1);
+    Assertions.assertEquals(1, modelVersionPO.getModelVersion());
+    Assertions.assertEquals(1L, modelVersionPO.getModelId());
+    Assertions.assertEquals(1L, modelVersionPO.getMetalakeId());
+    Assertions.assertEquals(1L, modelVersionPO.getCatalogId());
+    Assertions.assertEquals(1L, modelVersionPO.getSchemaId());
+    Assertions.assertEquals("this is test", 
modelVersionPO.getModelVersionComment());
+    Assertions.assertEquals("hdfs://localhost/test", 
modelVersionPO.getModelVersionUri());
+    Assertions.assertEquals(0L, modelVersionPO.getDeletedAt());
+
+    Map<String, String> resultProperties =
+        
JsonUtils.anyFieldMapper().readValue(modelVersionPO.getModelVersionProperties(),
 Map.class);
+    Assertions.assertEquals(ImmutableMap.of("key", "value"), resultProperties);
+
+    AuditInfo resultAuditInfo =
+        JsonUtils.anyFieldMapper().readValue(modelVersionPO.getAuditInfo(), 
AuditInfo.class);
+    Assertions.assertEquals(auditInfo, resultAuditInfo);
+
+    List<ModelVersionAliasRelPO> aliasPOs =
+        POConverters.initializeModelVersionAliasRelPO(
+            modelVersionEntity, modelVersionPO.getModelId());
+    Assertions.assertEquals(1, aliasPOs.size());
+    Assertions.assertEquals(1, aliasPOs.get(0).getModelVersion());
+    Assertions.assertEquals("alias1", aliasPOs.get(0).getModelVersionAlias());
+    Assertions.assertEquals(1L, aliasPOs.get(0).getModelId());
+    Assertions.assertEquals(0L, aliasPOs.get(0).getDeletedAt());
+
+    // Test with null fields
+    ModelVersionEntity modelVersionEntityWithNull =
+        ModelVersionEntity.builder()
+            .withModelIdentifier(modelIdent)
+            .withVersion(1)
+            .withAliases(null)
+            .withComment(null)
+            .withProperties(null)
+            .withUri("hdfs://localhost/test")
+            .withAuditInfo(auditInfo)
+            .build();
+
+    ModelVersionPO.Builder builder2 =
+        ModelVersionPO.builder()
+            .withModelId(1L)
+            .withMetalakeId(1L)
+            .withCatalogId(1L)
+            .withSchemaId(1L);
+
+    ModelVersionPO modelVersionPOWithNull =
+        POConverters.initializeModelVersionPO(modelVersionEntityWithNull, 
builder2);
+    Assertions.assertNull(modelVersionPOWithNull.getModelVersionComment());
+
+    Map<String, String> resultPropertiesWithNull =
+        JsonUtils.anyFieldMapper()
+            .readValue(modelVersionPOWithNull.getModelVersionProperties(), 
Map.class);
+    Assertions.assertNull(resultPropertiesWithNull);
+
+    List<ModelVersionAliasRelPO> aliasPOsWithNull =
+        POConverters.initializeModelVersionAliasRelPO(
+            modelVersionEntityWithNull, modelVersionPOWithNull.getModelId());
+    Assertions.assertEquals(0, aliasPOsWithNull.size());
+  }
+
+  @Test
+  public void testFromModelVersionPO() throws JsonProcessingException {
+    NameIdentifier modelIdent = NameIdentifierUtil.ofModel("m", "c", "s", 
"model1");
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
+    Map<String, String> properties = ImmutableMap.of("key", "value");
+    List<String> aliases = ImmutableList.of("alias1", "alias2");
+
+    ModelVersionPO modelVersionPO =
+        ModelVersionPO.builder()
+            .withModelVersion(1)
+            .withModelId(1L)
+            .withMetalakeId(1L)
+            .withCatalogId(1L)
+            .withSchemaId(1L)
+            .withModelVersionComment("this is test")
+            
.withModelVersionProperties(JsonUtils.anyFieldMapper().writeValueAsString(properties))
+            
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo))
+            .withModelVersionUri("hdfs://localhost/test")
+            .withDeletedAt(0L)
+            .build();
+    List<ModelVersionAliasRelPO> aliasPOs =
+        aliases.stream()
+            .map(
+                a ->
+                    ModelVersionAliasRelPO.builder()
+                        .withModelVersionAlias(a)
+                        .withModelVersion(1)
+                        .withModelId(1L)
+                        .withDeletedAt(0L)
+                        .build())
+            .collect(Collectors.toList());
+
+    ModelVersionEntity expectedModelVersion =
+        ModelVersionEntity.builder()
+            .withModelIdentifier(modelIdent)
+            .withVersion(1)
+            .withAliases(aliases)
+            .withComment("this is test")
+            .withProperties(properties)
+            .withUri("hdfs://localhost/test")
+            .withAuditInfo(auditInfo)
+            .build();
+
+    ModelVersionEntity convertedModelVersion =
+        POConverters.fromModelVersionPO(modelIdent, modelVersionPO, aliasPOs);
+    assertEquals(expectedModelVersion, convertedModelVersion);
+
+    // test null fields
+    ModelVersionPO modelVersionPOWithNull =
+        ModelVersionPO.builder()
+            .withModelVersion(1)
+            .withModelId(1L)
+            .withMetalakeId(1L)
+            .withCatalogId(1L)
+            .withSchemaId(1L)
+            .withModelVersionComment(null)
+            
.withModelVersionProperties(JsonUtils.anyFieldMapper().writeValueAsString(null))
+            
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo))
+            .withModelVersionUri("hdfs://localhost/test")
+            .withDeletedAt(0L)
+            .build();
+    List<ModelVersionAliasRelPO> aliasPOsWithNull = Collections.emptyList();
+
+    ModelVersionEntity expectedModelVersionWithNull =
+        ModelVersionEntity.builder()
+            .withModelIdentifier(modelIdent)
+            .withVersion(1)
+            .withAliases(Collections.emptyList())
+            .withComment(null)
+            .withProperties(null)
+            .withUri("hdfs://localhost/test")
+            .withAuditInfo(auditInfo)
+            .build();
+
+    ModelVersionEntity convertedModelVersionWithNull =
+        POConverters.fromModelVersionPO(modelIdent, modelVersionPOWithNull, 
aliasPOsWithNull);
+    assertEquals(expectedModelVersionWithNull, convertedModelVersionWithNull);
+  }
+
   private static BaseMetalake createMetalake(Long id, String name, String 
comment) {
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();

Reply via email to