This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 178eb37f8 [#4018] feat(core): Add tag management logic for Tag System 
(Part 1) (#4019)
178eb37f8 is described below

commit 178eb37f8b5013abdd2464dda764ddd5b0787f38
Author: Jerry Shao <[email protected]>
AuthorDate: Tue Jul 9 09:35:33 2024 +0800

    [#4018] feat(core): Add tag management logic for Tag System (Part 1) (#4019)
    
    ### What changes were proposed in this pull request?
    
    This PR tracks the work of adding the core logics for tag management.
    
    ### Why are the changes needed?
    
    This is a part of work for adding tag support in Gravitino.
    
    Fix: #4018
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UTs added.
---
 .../com/datastrato/gravitino/meta/TagEntity.java   |  16 --
 .../gravitino/storage/relational/JDBCBackend.java  |  15 ++
 .../storage/relational/mapper/TagMetaMapper.java   | 175 ++++++++++++
 .../mapper/TagMetadataObjectRelMapper.java         |  59 ++++
 .../gravitino/storage/relational/po/TagPO.java     | 142 ++++++++++
 .../relational/service/MetalakeMetaService.java    |  22 +-
 .../storage/relational/service/TagMetaService.java | 180 +++++++++++++
 .../session/SqlSessionFactoryHelper.java           |   4 +
 .../storage/relational/utils/POConverters.java     |  57 ++++
 .../com/datastrato/gravitino/tag/TagManager.java   | 185 ++++++++++++-
 .../com/datastrato/gravitino/meta/TestEntity.java  |  20 --
 .../storage/relational/TestJDBCBackend.java        |  37 +++
 .../relational/service/TestTagMetaService.java     | 300 +++++++++++++++++++++
 .../storage/relational/utils/TestPOConverters.java |  91 +++++++
 .../datastrato/gravitino/tag/TestTagManager.java   | 248 +++++++++++++++++
 .../integration/test/util/AbstractIT.java          |   9 +-
 .../relational/service/FilesetMetaServiceIT.java   |   7 +-
 scripts/h2/schema-h2.sql                           |  31 ++-
 scripts/mysql/schema-0.5.0-mysql.sql               |   2 +-
 scripts/mysql/schema-0.6.0-mysql.sql               |  31 ++-
 scripts/mysql/upgrade-0.5.0-to-0.6.0-mysql.sql     |  29 ++
 21 files changed, 1609 insertions(+), 51 deletions(-)

diff --git a/core/src/main/java/com/datastrato/gravitino/meta/TagEntity.java 
b/core/src/main/java/com/datastrato/gravitino/meta/TagEntity.java
index b4acaf71f..c6e01ec7e 100644
--- a/core/src/main/java/com/datastrato/gravitino/meta/TagEntity.java
+++ b/core/src/main/java/com/datastrato/gravitino/meta/TagEntity.java
@@ -24,7 +24,6 @@ import com.datastrato.gravitino.Auditable;
 import com.datastrato.gravitino.Entity;
 import com.datastrato.gravitino.Field;
 import com.datastrato.gravitino.HasIdentifier;
-import com.datastrato.gravitino.MetadataObject;
 import com.datastrato.gravitino.Namespace;
 import com.datastrato.gravitino.tag.Tag;
 import com.google.common.collect.Maps;
@@ -47,10 +46,6 @@ public class TagEntity implements Tag, Entity, Auditable, 
HasIdentifier {
   public static final Field PROPERTIES =
       Field.optional("properties", Map.class, "The properties of the tag 
entity.");
 
-  public static final Field ASSOCIATED_OBJECTS =
-      Field.optional(
-          "objects", MetadataObject[].class, "The associated objects of the 
tag entity.");
-
   public static final Field AUDIT_INFO =
       Field.required("audit_info", Audit.class, "The audit details of the tag 
entity.");
 
@@ -59,7 +54,6 @@ public class TagEntity implements Tag, Entity, Auditable, 
HasIdentifier {
   private Namespace namespace;
   private String comment;
   private Map<String, String> properties;
-  private MetadataObject[] objects = null;
   private Audit auditInfo;
 
   private TagEntity() {}
@@ -72,7 +66,6 @@ public class TagEntity implements Tag, Entity, Auditable, 
HasIdentifier {
     fields.put(COMMENT, comment);
     fields.put(PROPERTIES, properties);
     fields.put(AUDIT_INFO, auditInfo);
-    fields.put(ASSOCIATED_OBJECTS, objects);
 
     return Collections.unmodifiableMap(fields);
   }
@@ -112,10 +105,6 @@ public class TagEntity implements Tag, Entity, Auditable, 
HasIdentifier {
     return Optional.empty();
   }
 
-  public MetadataObject[] objects() {
-    return objects;
-  }
-
   @Override
   public Audit auditInfo() {
     return auditInfo;
@@ -181,11 +170,6 @@ public class TagEntity implements Tag, Entity, Auditable, 
HasIdentifier {
       return this;
     }
 
-    public Builder withMetadataObjects(MetadataObject[] objects) {
-      tagEntity.objects = objects;
-      return this;
-    }
-
     public Builder withAuditInfo(Audit auditInfo) {
       tagEntity.auditInfo = auditInfo;
       return this;
diff --git 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java
 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java
index ea574339a..83af751d7 100644
--- 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java
@@ -37,6 +37,7 @@ import com.datastrato.gravitino.meta.GroupEntity;
 import com.datastrato.gravitino.meta.RoleEntity;
 import com.datastrato.gravitino.meta.SchemaEntity;
 import com.datastrato.gravitino.meta.TableEntity;
+import com.datastrato.gravitino.meta.TagEntity;
 import com.datastrato.gravitino.meta.TopicEntity;
 import com.datastrato.gravitino.meta.UserEntity;
 import 
com.datastrato.gravitino.storage.relational.converters.SQLExceptionConverterFactory;
@@ -48,6 +49,7 @@ import 
com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
 import com.datastrato.gravitino.storage.relational.service.RoleMetaService;
 import com.datastrato.gravitino.storage.relational.service.SchemaMetaService;
 import com.datastrato.gravitino.storage.relational.service.TableMetaService;
+import com.datastrato.gravitino.storage.relational.service.TagMetaService;
 import com.datastrato.gravitino.storage.relational.service.TopicMetaService;
 import com.datastrato.gravitino.storage.relational.service.UserMetaService;
 import 
com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
@@ -96,6 +98,8 @@ public class JDBCBackend implements RelationalBackend {
         return (List<E>) 
FilesetMetaService.getInstance().listFilesetsByNamespace(namespace);
       case TOPIC:
         return (List<E>) 
TopicMetaService.getInstance().listTopicsByNamespace(namespace);
+      case TAG:
+        return (List<E>) 
TagMetaService.getInstance().listTagsByNamespace(namespace);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for list operation", entityType);
@@ -133,6 +137,8 @@ public class JDBCBackend implements RelationalBackend {
       RoleMetaService.getInstance().insertRole((RoleEntity) e, overwritten);
     } else if (e instanceof GroupEntity) {
       GroupMetaService.getInstance().insertGroup((GroupEntity) e, overwritten);
+    } else if (e instanceof TagEntity) {
+      TagMetaService.getInstance().insertTag((TagEntity) e, overwritten);
     } else {
       throw new UnsupportedEntityTypeException(
           "Unsupported entity type: %s for insert operation", e.getClass());
@@ -160,6 +166,8 @@ public class JDBCBackend implements RelationalBackend {
         return (E) UserMetaService.getInstance().updateUser(ident, updater);
       case GROUP:
         return (E) GroupMetaService.getInstance().updateGroup(ident, updater);
+      case TAG:
+        return (E) TagMetaService.getInstance().updateTag(ident, updater);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for update operation", entityType);
@@ -189,6 +197,8 @@ public class JDBCBackend implements RelationalBackend {
         return (E) GroupMetaService.getInstance().getGroupByIdentifier(ident);
       case ROLE:
         return (E) RoleMetaService.getInstance().getRoleByIdentifier(ident);
+      case TAG:
+        return (E) TagMetaService.getInstance().getTagByIdentifier(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for get operation", entityType);
@@ -217,6 +227,8 @@ public class JDBCBackend implements RelationalBackend {
         return GroupMetaService.getInstance().deleteGroup(ident);
       case ROLE:
         return RoleMetaService.getInstance().deleteRole(ident);
+      case TAG:
+        return TagMetaService.getInstance().deleteTag(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for delete operation", entityType);
@@ -264,6 +276,9 @@ public class JDBCBackend implements RelationalBackend {
             .deleteRoleMetasByLegacyTimeline(
                 legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case TAG:
+        return TagMetaService.getInstance()
+            .deleteTagMetasByLegacyTimeline(
+                legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case COLUMN:
       case AUDIT:
         return 0;
diff --git 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TagMetaMapper.java
 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TagMetaMapper.java
new file mode 100644
index 000000000..205bbbdf7
--- /dev/null
+++ 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TagMetaMapper.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datastrato.gravitino.storage.relational.mapper;
+
+import com.datastrato.gravitino.storage.relational.po.TagPO;
+import java.util.List;
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Insert;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+public interface TagMetaMapper {
+
+  String TAG_TABLE_NAME = "tag_meta";
+
+  @Select(
+      "SELECT tm.tag_id as tagId, tag_name as tagName,"
+          + " tm.metalake_id as metalakeId,"
+          + " tm.tag_comment as comment,"
+          + " tm.properties as properties,"
+          + " tm.audit_info as auditInfo,"
+          + " tm.current_version as currentVersion,"
+          + " tm.last_version as lastVersion,"
+          + " tm.deleted_at as deletedAt"
+          + " FROM "
+          + TAG_TABLE_NAME
+          + " tm JOIN "
+          + MetalakeMetaMapper.TABLE_NAME
+          + " mm on tm.metalake_id = mm.metalake_id"
+          + " WHERE mm.metalake_name = #{metalakeName} AND tm.deleted_at = 0 
AND mm.deleted_at = 0")
+  List<TagPO> listTagPOsByMetalake(@Param("metalakeName") String metalakeName);
+
+  @Select(
+      "SELECT tm.tag_id as tagId FROM "
+          + TAG_TABLE_NAME
+          + " tm JOIN "
+          + MetalakeMetaMapper.TABLE_NAME
+          + " mm on tm.metalake_id = mm.metalake_id"
+          + " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = 
#{tagName}"
+          + " AND tm.deleted_at = 0 AND mm.deleted_at = 0")
+  Long selectTagIdByMetalakeAndName(
+      @Param("metalakeName") String metalakeName, @Param("tagName") String 
tagName);
+
+  @Select(
+      "SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
+          + " tm.metalake_id as metalakeId,"
+          + " tm.tag_comment as comment,"
+          + " tm.properties as properties,"
+          + " tm.audit_info as auditInfo,"
+          + " tm.current_version as currentVersion,"
+          + " tm.last_version as lastVersion,"
+          + " tm.deleted_at as deletedAt"
+          + " FROM "
+          + TAG_TABLE_NAME
+          + " tm JOIN "
+          + MetalakeMetaMapper.TABLE_NAME
+          + " mm on tm.metalake_id = mm.metalake_id"
+          + " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = 
#{tagName}"
+          + " AND tm.deleted_at = 0 AND mm.deleted_at = 0")
+  TagPO selectTagMetaByMetalakeAndName(
+      @Param("metalakeName") String metalakeName, @Param("tagName") String 
tagName);
+
+  @Insert(
+      "INSERT INTO "
+          + TAG_TABLE_NAME
+          + " (tag_id, tag_name,"
+          + " metalake_id, tag_comment, properties, audit_info,"
+          + " current_version, last_version, deleted_at)"
+          + " VALUES("
+          + " #{tagMeta.tagId},"
+          + " #{tagMeta.tagName},"
+          + " #{tagMeta.metalakeId},"
+          + " #{tagMeta.comment},"
+          + " #{tagMeta.properties},"
+          + " #{tagMeta.auditInfo},"
+          + " #{tagMeta.currentVersion},"
+          + " #{tagMeta.lastVersion},"
+          + " #{tagMeta.deletedAt}"
+          + " )")
+  void insertTagMeta(@Param("tagMeta") TagPO tagPO);
+
+  @Insert(
+      "INSERT INTO "
+          + TAG_TABLE_NAME
+          + "(tag_id, tag_name,"
+          + " metalake_id, tag_comment, properties, audit_info,"
+          + " current_version, last_version, deleted_at)"
+          + " VALUES("
+          + " #{tagMeta.tagId},"
+          + " #{tagMeta.tagName},"
+          + " #{tagMeta.metalakeId},"
+          + " #{tagMeta.comment},"
+          + " #{tagMeta.properties},"
+          + " #{tagMeta.auditInfo},"
+          + " #{tagMeta.currentVersion},"
+          + " #{tagMeta.lastVersion},"
+          + " #{tagMeta.deletedAt}"
+          + " )"
+          + " ON DUPLICATE KEY UPDATE"
+          + " tag_name = #{tagMeta.tagName},"
+          + " metalake_id = #{tagMeta.metalakeId},"
+          + " tag_comment = #{tagMeta.comment},"
+          + " properties = #{tagMeta.properties},"
+          + " audit_info = #{tagMeta.auditInfo},"
+          + " current_version = #{tagMeta.currentVersion},"
+          + " last_version = #{tagMeta.lastVersion},"
+          + " deleted_at = #{tagMeta.deletedAt}")
+  void insertTagMetaOnDuplicateKeyUpdate(@Param("tagMeta") TagPO tagPO);
+
+  @Update(
+      "UPDATE "
+          + TAG_TABLE_NAME
+          + " SET tag_name = #{newTagMeta.tagName},"
+          + " tag_comment = #{newTagMeta.comment},"
+          + " properties = #{newTagMeta.properties},"
+          + " audit_info = #{newTagMeta.auditInfo},"
+          + " current_version = #{newTagMeta.currentVersion},"
+          + " last_version = #{newTagMeta.lastVersion},"
+          + " deleted_at = #{newTagMeta.deletedAt}"
+          + " WHERE tag_id = #{oldTagMeta.tagId}"
+          + " AND metalake_id = #{oldTagMeta.metalakeId}"
+          + " AND tag_name = #{oldTagMeta.tagName}"
+          + " AND tag_comment = #{oldTagMeta.comment}"
+          + " AND properties = #{oldTagMeta.properties}"
+          + " AND audit_info = #{oldTagMeta.auditInfo}"
+          + " AND current_version = #{oldTagMeta.currentVersion}"
+          + " AND last_version = #{oldTagMeta.lastVersion}"
+          + "AND deleted_at = 0")
+  Integer updateTagMeta(@Param("newTagMeta") TagPO newTagPO, 
@Param("oldTagMeta") TagPO oldTagPO);
+
+  @Update(
+      "UPDATE "
+          + TAG_TABLE_NAME
+          + " tm SET tm.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+          + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+          + " WHERE tm.metalake_id IN ("
+          + " SELECT mm.metalake_id FROM "
+          + MetalakeMetaMapper.TABLE_NAME
+          + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 
0)"
+          + " AND tm.tag_name = #{tagName} AND tm.deleted_at = 0")
+  Integer softDeleteTagMetaByMetalakeAndTagName(
+      @Param("metalakeName") String metalakeName, @Param("tagName") String 
tagName);
+
+  @Update(
+      "UPDATE "
+          + TAG_TABLE_NAME
+          + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+          + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+          + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
+  void softDeleteTagMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
+
+  @Delete(
+      "DELETE FROM "
+          + TAG_TABLE_NAME
+          + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}")
+  Integer deleteTagMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TagMetadataObjectRelMapper.java
 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TagMetadataObjectRelMapper.java
new file mode 100644
index 000000000..4766dcfcd
--- /dev/null
+++ 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TagMetadataObjectRelMapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datastrato.gravitino.storage.relational.mapper;
+
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Update;
+
+public interface TagMetadataObjectRelMapper {
+  String TAG_METADATA_OBJECT_RELATION_TABLE_NAME = "tag_relation_meta";
+
+  @Update(
+      "UPDATE "
+          + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+          + " tmo SET tmo.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+          + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+          + " WHERE tmo.tag_id IN (SELECT tm.tag_id FROM "
+          + TagMetaMapper.TAG_TABLE_NAME
+          + " tm WHERE tm.metalake_id IN (SELECT mm.metalake_id FROM "
+          + MetalakeMetaMapper.TABLE_NAME
+          + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 
0)"
+          + " AND tm.deleted_at = 0) AND tmo.deleted_at = 0")
+  Integer softDeleteTagMetadataObjectRelsByMetalakeAndTagName(
+      @Param("metalakeName") String metalakeName, @Param("tagName") String 
tagName);
+
+  @Update(
+      "UPDATE "
+          + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+          + " tmo SET tmo.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+          + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+          + " WHERE EXISTS (SELECT * FROM "
+          + TagMetaMapper.TAG_TABLE_NAME
+          + " tm WHERE tm.metalake_id = #{metalakeId} AND tm.tag_id = 
tmo.tag_id"
+          + " AND tm.deleted_at = 0) AND tmo.deleted_at = 0")
+  void softDeleteTagMetadataObjectRelsByMetalakeId(@Param("metalakeId") Long 
metalakeId);
+
+  @Delete(
+      "DELETE FROM "
+          + TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+          + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}")
+  Integer deleteTagEntityRelsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/TagPO.java 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/TagPO.java
new file mode 100644
index 000000000..8924bdfd2
--- /dev/null
+++ 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/TagPO.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datastrato.gravitino.storage.relational.po;
+
+import com.google.common.base.Preconditions;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+
+@Getter
+public class TagPO {
+  private Long tagId;
+  private String tagName;
+  private Long metalakeId;
+  private String comment;
+  private String properties;
+  private String auditInfo;
+  private Long currentVersion;
+  private Long lastVersion;
+  private Long deletedAt;
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof TagPO)) {
+      return false;
+    }
+    TagPO tagPO = (TagPO) o;
+    return java.util.Objects.equals(tagId, tagPO.tagId)
+        && java.util.Objects.equals(tagName, tagPO.tagName)
+        && java.util.Objects.equals(metalakeId, tagPO.metalakeId)
+        && java.util.Objects.equals(comment, tagPO.comment)
+        && java.util.Objects.equals(properties, tagPO.properties)
+        && java.util.Objects.equals(auditInfo, tagPO.auditInfo)
+        && java.util.Objects.equals(currentVersion, tagPO.currentVersion)
+        && java.util.Objects.equals(lastVersion, tagPO.lastVersion)
+        && java.util.Objects.equals(deletedAt, tagPO.deletedAt);
+  }
+
+  @Override
+  public int hashCode() {
+    return java.util.Objects.hash(
+        tagId,
+        tagName,
+        metalakeId,
+        comment,
+        properties,
+        auditInfo,
+        currentVersion,
+        lastVersion,
+        deletedAt);
+  }
+
+  public static class Builder {
+    private final TagPO tagPO;
+
+    private Builder() {
+      tagPO = new TagPO();
+    }
+
+    public Builder withTagId(Long tagId) {
+      tagPO.tagId = tagId;
+      return this;
+    }
+
+    public Builder withTagName(String tagName) {
+      tagPO.tagName = tagName;
+      return this;
+    }
+
+    public Builder withMetalakeId(Long metalakeId) {
+      tagPO.metalakeId = metalakeId;
+      return this;
+    }
+
+    public Builder withComment(String comment) {
+      tagPO.comment = comment;
+      return this;
+    }
+
+    public Builder withProperties(String properties) {
+      tagPO.properties = properties;
+      return this;
+    }
+
+    public Builder withAuditInfo(String auditInfo) {
+      tagPO.auditInfo = auditInfo;
+      return this;
+    }
+
+    public Builder withCurrentVersion(Long currentVersion) {
+      tagPO.currentVersion = currentVersion;
+      return this;
+    }
+
+    public Builder withLastVersion(Long lastVersion) {
+      tagPO.lastVersion = lastVersion;
+      return this;
+    }
+
+    public Builder withDeletedAt(Long deletedAt) {
+      tagPO.deletedAt = deletedAt;
+      return this;
+    }
+
+    public TagPO build() {
+      validate();
+      return tagPO;
+    }
+
+    private void validate() {
+      Preconditions.checkArgument(tagPO.tagId != null, "tagId cannot be null");
+      Preconditions.checkArgument(StringUtils.isNotBlank(tagPO.tagName), 
"tagName cannot be empty");
+      Preconditions.checkArgument(tagPO.metalakeId != null, "metalakeId cannot 
be null");
+      Preconditions.checkArgument(tagPO.auditInfo != null, "auditInfo cannot 
be null");
+      Preconditions.checkArgument(tagPO.currentVersion != null, 
"currentVersion cannot be null");
+      Preconditions.checkArgument(tagPO.lastVersion != null, "lastVersion 
cannot be null");
+      Preconditions.checkArgument(tagPO.deletedAt != null, "deletedAt cannot 
be null");
+    }
+  }
+}
diff --git 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java
 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java
index ef743dc40..4f145f427 100644
--- 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java
+++ 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -36,6 +36,8 @@ import 
com.datastrato.gravitino.storage.relational.mapper.RoleMetaMapper;
 import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import 
com.datastrato.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper;
+import com.datastrato.gravitino.storage.relational.mapper.TagMetaMapper;
+import 
com.datastrato.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import com.datastrato.gravitino.storage.relational.mapper.TopicMetaMapper;
 import com.datastrato.gravitino.storage.relational.mapper.UserMetaMapper;
 import com.datastrato.gravitino.storage.relational.mapper.UserRoleRelMapper;
@@ -223,7 +225,15 @@ public class MetalakeMetaService {
             () ->
                 SessionUtils.doWithoutCommit(
                     SecurableObjectMapper.class,
-                    mapper -> 
mapper.softDeleteRoleMetasByMetalakeId(metalakeId)));
+                    mapper -> 
mapper.softDeleteRoleMetasByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    TagMetaMapper.class,
+                    mapper -> 
mapper.softDeleteTagMetasByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    TagMetadataObjectRelMapper.class,
+                    mapper -> 
mapper.softDeleteTagMetadataObjectRelsByMetalakeId(metalakeId)));
       } else {
         List<CatalogEntity> catalogEntities =
             CatalogMetaService.getInstance()
@@ -260,7 +270,15 @@ public class MetalakeMetaService {
             () ->
                 SessionUtils.doWithoutCommit(
                     SecurableObjectMapper.class,
-                    mapper -> 
mapper.softDeleteRoleMetasByMetalakeId(metalakeId)));
+                    mapper -> 
mapper.softDeleteRoleMetasByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    TagMetaMapper.class,
+                    mapper -> 
mapper.softDeleteTagMetasByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    TagMetadataObjectRelMapper.class,
+                    mapper -> 
mapper.softDeleteTagMetadataObjectRelsByMetalakeId(metalakeId)));
       }
     }
     return true;
diff --git 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TagMetaService.java
 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TagMetaService.java
new file mode 100644
index 000000000..19f0dd380
--- /dev/null
+++ 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TagMetaService.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datastrato.gravitino.storage.relational.service;
+
+import com.datastrato.gravitino.Entity;
+import com.datastrato.gravitino.HasIdentifier;
+import com.datastrato.gravitino.NameIdentifier;
+import com.datastrato.gravitino.Namespace;
+import com.datastrato.gravitino.exceptions.NoSuchEntityException;
+import com.datastrato.gravitino.meta.TagEntity;
+import com.datastrato.gravitino.storage.relational.mapper.TagMetaMapper;
+import 
com.datastrato.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
+import com.datastrato.gravitino.storage.relational.po.TagPO;
+import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils;
+import com.datastrato.gravitino.storage.relational.utils.POConverters;
+import com.datastrato.gravitino.storage.relational.utils.SessionUtils;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class TagMetaService {
+
+  private static final TagMetaService INSTANCE = new TagMetaService();
+
+  public static TagMetaService getInstance() {
+    return INSTANCE;
+  }
+
+  private TagMetaService() {}
+
+  public List<TagEntity> listTagsByNamespace(Namespace ns) {
+    String metalakeName = ns.level(0);
+    List<TagPO> tagPOs =
+        SessionUtils.getWithoutCommit(
+            TagMetaMapper.class, mapper -> 
mapper.listTagPOsByMetalake(metalakeName));
+    return tagPOs.stream()
+        .map(tagPO -> POConverters.fromTagPO(tagPO, ns))
+        .collect(Collectors.toList());
+  }
+
+  public TagEntity getTagByIdentifier(NameIdentifier ident) {
+    String metalakeName = ident.namespace().level(0);
+    TagPO tagPO = getTagPOByMetalakeAndName(metalakeName, ident.name());
+    return POConverters.fromTagPO(tagPO, ident.namespace());
+  }
+
+  public void insertTag(TagEntity tagEntity, boolean overwritten) throws 
IOException {
+    Namespace ns = tagEntity.namespace();
+    String metalakeName = ns.level(0);
+
+    try {
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
+
+      TagPO.Builder builder = TagPO.builder().withMetalakeId(metalakeId);
+      TagPO tagPO = POConverters.initializeTagPOWithVersion(tagEntity, 
builder);
+
+      SessionUtils.doWithCommit(
+          TagMetaMapper.class,
+          mapper -> {
+            if (overwritten) {
+              mapper.insertTagMetaOnDuplicateKeyUpdate(tagPO);
+            } else {
+              mapper.insertTagMeta(tagPO);
+            }
+          });
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.TAG, 
tagEntity.toString());
+      throw e;
+    }
+  }
+
+  public <E extends Entity & HasIdentifier> TagEntity updateTag(
+      NameIdentifier ident, Function<E, E> updater) throws IOException {
+    String metalakeName = ident.namespace().level(0);
+
+    try {
+      TagPO tagPO = getTagPOByMetalakeAndName(metalakeName, ident.name());
+      TagEntity oldTagEntity = POConverters.fromTagPO(tagPO, 
ident.namespace());
+      TagEntity updatedTagEntity = (TagEntity) updater.apply((E) oldTagEntity);
+      Preconditions.checkArgument(
+          Objects.equals(oldTagEntity.id(), updatedTagEntity.id()),
+          "The updated tag entity id: %s must have the same id as the old 
entity id %s",
+          updatedTagEntity.id(),
+          oldTagEntity.id());
+
+      Integer result =
+          SessionUtils.doWithCommitAndFetchResult(
+              TagMetaMapper.class,
+              mapper ->
+                  mapper.updateTagMeta(
+                      POConverters.updateTagPOWithVersion(tagPO, 
updatedTagEntity), tagPO));
+
+      if (result == null || result == 0) {
+        throw new IOException("Failed to update the entity: " + ident);
+      }
+
+      return updatedTagEntity;
+
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.TAG, 
ident.toString());
+      throw e;
+    }
+  }
+
+  public boolean deleteTag(NameIdentifier ident) {
+    String metalakeName = ident.namespace().level(0);
+    int[] tagDeletedCount = new int[] {0};
+    int[] tagMetadataObjectRelDeletedCount = new int[] {0};
+
+    SessionUtils.doMultipleWithCommit(
+        () ->
+            tagDeletedCount[0] =
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    TagMetaMapper.class,
+                    mapper ->
+                        
mapper.softDeleteTagMetaByMetalakeAndTagName(metalakeName, ident.name())),
+        () ->
+            tagMetadataObjectRelDeletedCount[0] =
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    TagMetadataObjectRelMapper.class,
+                    mapper ->
+                        
mapper.softDeleteTagMetadataObjectRelsByMetalakeAndTagName(
+                            metalakeName, ident.name())));
+
+    return tagDeletedCount[0] + tagMetadataObjectRelDeletedCount[0] > 0;
+  }
+
+  public int deleteTagMetasByLegacyTimeline(long legacyTimeline, int limit) {
+    int[] tagDeletedCount = new int[] {0};
+    int[] tagMetadataObjectRelDeletedCount = new int[] {0};
+
+    SessionUtils.doMultipleWithCommit(
+        () ->
+            tagDeletedCount[0] =
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    TagMetaMapper.class,
+                    mapper -> 
mapper.deleteTagMetasByLegacyTimeline(legacyTimeline, limit)),
+        () ->
+            tagMetadataObjectRelDeletedCount[0] =
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    TagMetadataObjectRelMapper.class,
+                    mapper -> 
mapper.deleteTagEntityRelsByLegacyTimeline(legacyTimeline, limit)));
+
+    return tagDeletedCount[0] + tagMetadataObjectRelDeletedCount[0];
+  }
+
+  private TagPO getTagPOByMetalakeAndName(String metalakeName, String tagName) 
{
+    TagPO tagPO =
+        SessionUtils.getWithoutCommit(
+            TagMetaMapper.class,
+            mapper -> mapper.selectTagMetaByMetalakeAndName(metalakeName, 
tagName));
+
+    if (tagPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.TAG.name().toLowerCase(),
+          tagName);
+    }
+    return tagPO;
+  }
+}
diff --git 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
index bd23f673c..ebc28456a 100644
--- 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
+++ 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
@@ -31,6 +31,8 @@ import 
com.datastrato.gravitino.storage.relational.mapper.RoleMetaMapper;
 import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import 
com.datastrato.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper;
+import com.datastrato.gravitino.storage.relational.mapper.TagMetaMapper;
+import 
com.datastrato.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import com.datastrato.gravitino.storage.relational.mapper.TopicMetaMapper;
 import com.datastrato.gravitino.storage.relational.mapper.UserMetaMapper;
 import com.datastrato.gravitino.storage.relational.mapper.UserRoleRelMapper;
@@ -112,6 +114,8 @@ public class SqlSessionFactoryHelper {
     configuration.addMapper(GroupMetaMapper.class);
     configuration.addMapper(GroupRoleRelMapper.class);
     configuration.addMapper(SecurableObjectMapper.class);
+    configuration.addMapper(TagMetaMapper.class);
+    configuration.addMapper(TagMetadataObjectRelMapper.class);
 
     // Create the SqlSessionFactory object, it is a singleton object
     if (sqlSessionFactory == null) {
diff --git 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java
index 9eef4f514..4f1431d2d 100644
--- 
a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java
@@ -37,6 +37,7 @@ import com.datastrato.gravitino.meta.RoleEntity;
 import com.datastrato.gravitino.meta.SchemaEntity;
 import com.datastrato.gravitino.meta.SchemaVersion;
 import com.datastrato.gravitino.meta.TableEntity;
+import com.datastrato.gravitino.meta.TagEntity;
 import com.datastrato.gravitino.meta.TopicEntity;
 import com.datastrato.gravitino.meta.UserEntity;
 import com.datastrato.gravitino.storage.relational.po.CatalogPO;
@@ -49,6 +50,7 @@ import com.datastrato.gravitino.storage.relational.po.RolePO;
 import com.datastrato.gravitino.storage.relational.po.SchemaPO;
 import com.datastrato.gravitino.storage.relational.po.SecurableObjectPO;
 import com.datastrato.gravitino.storage.relational.po.TablePO;
+import com.datastrato.gravitino.storage.relational.po.TagPO;
 import com.datastrato.gravitino.storage.relational.po.TopicPO;
 import com.datastrato.gravitino.storage.relational.po.UserPO;
 import com.datastrato.gravitino.storage.relational.po.UserRoleRelPO;
@@ -958,4 +960,59 @@ public class POConverters {
       throw new RuntimeException("Failed to serialize json object:", e);
     }
   }
+
+  public static TagEntity fromTagPO(TagPO tagPO, Namespace namespace) {
+    try {
+      return TagEntity.builder()
+          .withId(tagPO.getTagId())
+          .withName(tagPO.getTagName())
+          .withNamespace(namespace)
+          .withComment(tagPO.getComment())
+          
.withProperties(JsonUtils.anyFieldMapper().readValue(tagPO.getProperties(), 
Map.class))
+          .withAuditInfo(
+              JsonUtils.anyFieldMapper().readValue(tagPO.getAuditInfo(), 
AuditInfo.class))
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to deserialize json object:", e);
+    }
+  }
+
+  public static TagPO initializeTagPOWithVersion(TagEntity tagEntity, 
TagPO.Builder builder) {
+    try {
+      return builder
+          .withTagId(tagEntity.id())
+          .withTagName(tagEntity.name())
+          .withComment(tagEntity.comment())
+          
.withProperties(JsonUtils.anyFieldMapper().writeValueAsString(tagEntity.properties()))
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(tagEntity.auditInfo()))
+          .withCurrentVersion(INIT_VERSION)
+          .withLastVersion(INIT_VERSION)
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
+    }
+  }
+
+  public static TagPO updateTagPOWithVersion(TagPO oldTagPO, TagEntity 
newEntity) {
+    Long lastVersion = oldTagPO.getLastVersion();
+    // TODO: set the version to the last version + 1 when having some fields 
need be multiple
+    // version
+    Long nextVersion = lastVersion;
+    try {
+      return TagPO.builder()
+          .withTagId(oldTagPO.getTagId())
+          .withTagName(newEntity.name())
+          .withMetalakeId(oldTagPO.getMetalakeId())
+          .withComment(newEntity.comment())
+          
.withProperties(JsonUtils.anyFieldMapper().writeValueAsString(newEntity.properties()))
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newEntity.auditInfo()))
+          .withCurrentVersion(nextVersion)
+          .withLastVersion(nextVersion)
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
+    }
+  }
 }
diff --git a/core/src/main/java/com/datastrato/gravitino/tag/TagManager.java 
b/core/src/main/java/com/datastrato/gravitino/tag/TagManager.java
index 9dbfcbea7..720da6332 100644
--- a/core/src/main/java/com/datastrato/gravitino/tag/TagManager.java
+++ b/core/src/main/java/com/datastrato/gravitino/tag/TagManager.java
@@ -19,15 +19,27 @@
 package com.datastrato.gravitino.tag;
 
 import com.datastrato.gravitino.Entity;
+import com.datastrato.gravitino.EntityAlreadyExistsException;
 import com.datastrato.gravitino.EntityStore;
 import com.datastrato.gravitino.MetadataObject;
 import com.datastrato.gravitino.NameIdentifier;
 import com.datastrato.gravitino.Namespace;
+import com.datastrato.gravitino.exceptions.NoSuchEntityException;
 import com.datastrato.gravitino.exceptions.NoSuchMetalakeException;
 import com.datastrato.gravitino.exceptions.NoSuchTagException;
 import com.datastrato.gravitino.exceptions.TagAlreadyExistsException;
+import com.datastrato.gravitino.lock.LockType;
+import com.datastrato.gravitino.lock.TreeLockUtils;
+import com.datastrato.gravitino.meta.AuditInfo;
+import com.datastrato.gravitino.meta.TagEntity;
 import com.datastrato.gravitino.storage.IdGenerator;
+import com.datastrato.gravitino.storage.kv.KvEntityStore;
+import com.datastrato.gravitino.utils.PrincipalUtils;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
 import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,32 +48,144 @@ public class TagManager {
 
   private static final Logger LOG = LoggerFactory.getLogger(TagManager.class);
 
-  public TagManager(IdGenerator idGenerator, EntityStore entityStore) {}
+  private final IdGenerator idGenerator;
+
+  private final EntityStore entityStore;
+
+  public TagManager(IdGenerator idGenerator, EntityStore entityStore) {
+    if (entityStore instanceof KvEntityStore) {
+      String errorMsg =
+          "TagManager cannot run with kv entity store, please configure the 
entity "
+              + "store to use relational entity store and restart the 
Gravitino server";
+      LOG.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+
+    this.idGenerator = idGenerator;
+    this.entityStore = entityStore;
+  }
 
   public String[] listTags(String metalake) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(ofTagNamespace(metalake).levels()),
+        LockType.READ,
+        () -> {
+          checkMetalakeExists(metalake, entityStore);
+
+          try {
+            return entityStore
+                .list(ofTagNamespace(metalake), TagEntity.class, 
Entity.EntityType.TAG).stream()
+                .map(TagEntity::name)
+                .toArray(String[]::new);
+          } catch (IOException ioe) {
+            LOG.error("Failed to list tags under metalake {}", metalake, ioe);
+            throw new RuntimeException(ioe);
+          }
+        });
   }
 
-  public Tag[] listTagsInfo(String metalake, boolean extended) {
+  public MetadataObject[] listAssociatedMetadataObjectsForTag(String metalake, 
String name) {
     throw new UnsupportedOperationException("Not implemented yet");
   }
 
   public Tag createTag(String metalake, String name, String comment, 
Map<String, String> properties)
       throws TagAlreadyExistsException {
-    throw new UnsupportedOperationException("Not implemented yet");
+    Map<String, String> tagProperties = properties == null ? 
Collections.emptyMap() : properties;
+
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(ofTagNamespace(metalake).levels()),
+        LockType.WRITE,
+        () -> {
+          checkMetalakeExists(metalake, entityStore);
+
+          TagEntity tagEntity =
+              TagEntity.builder()
+                  .withId(idGenerator.nextId())
+                  .withName(name)
+                  .withNamespace(ofTagNamespace(metalake))
+                  .withComment(comment)
+                  .withProperties(tagProperties)
+                  .withAuditInfo(
+                      AuditInfo.builder()
+                          
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                          .withCreateTime(Instant.now())
+                          .build())
+                  .build();
+
+          try {
+            entityStore.put(tagEntity, false /* overwritten */);
+            return tagEntity;
+          } catch (EntityAlreadyExistsException e) {
+            throw new TagAlreadyExistsException(
+                "Tag with name %s under metalake %s already exists", name, 
metalake);
+          } catch (IOException ioe) {
+            LOG.error("Failed to create tag {} under metalake {}", name, 
metalake, ioe);
+            throw new RuntimeException(ioe);
+          }
+        });
   }
 
   public Tag getTag(String metalake, String name) throws NoSuchTagException {
-    throw new UnsupportedOperationException("Not implemented yet");
+    return TreeLockUtils.doWithTreeLock(
+        ofTagIdent(metalake, name),
+        LockType.READ,
+        () -> {
+          checkMetalakeExists(metalake, entityStore);
+
+          try {
+            return entityStore.get(
+                ofTagIdent(metalake, name), Entity.EntityType.TAG, 
TagEntity.class);
+          } catch (NoSuchEntityException e) {
+            throw new NoSuchTagException(
+                "Tag with name %s under metalake %s does not exist", name, 
metalake);
+          } catch (IOException ioe) {
+            LOG.error("Failed to get tag {} under metalake {}", name, 
metalake, ioe);
+            throw new RuntimeException(ioe);
+          }
+        });
   }
 
   public Tag alterTag(String metalake, String name, TagChange... changes)
       throws NoSuchTagException, IllegalArgumentException {
-    throw new UnsupportedOperationException("Not implemented yet");
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(ofTagNamespace(metalake).levels()),
+        LockType.WRITE,
+        () -> {
+          checkMetalakeExists(metalake, entityStore);
+
+          try {
+            return entityStore.update(
+                ofTagIdent(metalake, name),
+                TagEntity.class,
+                Entity.EntityType.TAG,
+                tagEntity -> updateTagEntity(tagEntity, changes));
+          } catch (NoSuchEntityException e) {
+            throw new NoSuchTagException(
+                "Tag with name %s under metalake %s does not exist", name, 
metalake);
+          } catch (EntityAlreadyExistsException e) {
+            throw new RuntimeException(
+                "Tag with name " + name + " under metalake " + metalake + " 
already exists");
+          } catch (IOException ioe) {
+            LOG.error("Failed to alter tag {} under metalake {}", name, 
metalake, ioe);
+            throw new RuntimeException(ioe);
+          }
+        });
   }
 
   public boolean deleteTag(String metalake, String name) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(ofTagNamespace(metalake).levels()),
+        LockType.WRITE,
+        () -> {
+          checkMetalakeExists(metalake, entityStore);
+
+          try {
+            return entityStore.delete(ofTagIdent(metalake, name), 
Entity.EntityType.TAG);
+          } catch (IOException ioe) {
+            LOG.error("Failed to delete tag {} under metalake {}", name, 
metalake, ioe);
+            throw new RuntimeException(ioe);
+          }
+        });
   }
 
   public String[] listTagsForMetadataObject(String metalake, MetadataObject 
metadataObject) {
@@ -95,7 +219,52 @@ public class TagManager {
     }
   }
 
-  private static Namespace ofTagNamespace(String metalake) {
+  @VisibleForTesting
+  public static Namespace ofTagNamespace(String metalake) {
     return Namespace.of(metalake, Entity.SYSTEM_CATALOG_RESERVED_NAME, 
Entity.TAG_SCHEMA_NAME);
   }
+
+  public static NameIdentifier ofTagIdent(String metalake, String tagName) {
+    return NameIdentifier.of(ofTagNamespace(metalake), tagName);
+  }
+
+  private TagEntity updateTagEntity(TagEntity tagEntity, TagChange... changes) 
{
+    Map<String, String> props =
+        tagEntity.properties() == null
+            ? Maps.newHashMap()
+            : Maps.newHashMap(tagEntity.properties());
+    String newName = tagEntity.name();
+    String newComment = tagEntity.comment();
+
+    for (TagChange change : changes) {
+      if (change instanceof TagChange.RenameTag) {
+        newName = ((TagChange.RenameTag) change).getNewName();
+      } else if (change instanceof TagChange.UpdateTagComment) {
+        newComment = ((TagChange.UpdateTagComment) change).getNewComment();
+      } else if (change instanceof TagChange.SetProperty) {
+        TagChange.SetProperty setProperty = (TagChange.SetProperty) change;
+        props.put(setProperty.getProperty(), setProperty.getValue());
+      } else if (change instanceof TagChange.RemoveProperty) {
+        TagChange.RemoveProperty removeProperty = (TagChange.RemoveProperty) 
change;
+        props.remove(removeProperty.getProperty());
+      } else {
+        throw new IllegalArgumentException("Unsupported tag change: " + 
change);
+      }
+    }
+
+    return TagEntity.builder()
+        .withId(tagEntity.id())
+        .withName(newName)
+        .withNamespace(tagEntity.namespace())
+        .withComment(newComment)
+        .withProperties(props)
+        .withAuditInfo(
+            AuditInfo.builder()
+                .withCreator(tagEntity.auditInfo().creator())
+                .withCreateTime(tagEntity.auditInfo().createTime())
+                
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                .withLastModifiedTime(Instant.now())
+                .build())
+        .build();
+  }
 }
diff --git a/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java 
b/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java
index c590fb425..321ad6b49 100644
--- a/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java
+++ b/core/src/test/java/com/datastrato/gravitino/meta/TestEntity.java
@@ -20,8 +20,6 @@ package com.datastrato.gravitino.meta;
 
 import com.datastrato.gravitino.Catalog;
 import com.datastrato.gravitino.Field;
-import com.datastrato.gravitino.MetadataObject;
-import com.datastrato.gravitino.MetadataObjects;
 import com.datastrato.gravitino.authorization.Privileges;
 import com.datastrato.gravitino.authorization.SecurableObjects;
 import com.datastrato.gravitino.file.Fileset;
@@ -335,23 +333,5 @@ public class TestEntity {
         
TagEntity.builder().withId(1L).withName("tag2").withAuditInfo(auditInfo).build();
     Assertions.assertNull(tag2.comment());
     Assertions.assertNull(tag2.properties());
-
-    MetadataObject[] metadataObjects =
-        new MetadataObject[] {
-          MetadataObjects.parse("test1", MetadataObject.Type.METALAKE),
-          MetadataObjects.parse("test2", MetadataObject.Type.CATALOG),
-          MetadataObjects.parse("a.b", MetadataObject.Type.SCHEMA),
-          MetadataObjects.parse("a.b.c", MetadataObject.Type.TABLE),
-          MetadataObjects.parse("a.b.c.d", MetadataObject.Type.COLUMN)
-        };
-
-    TagEntity tag3 =
-        TagEntity.builder()
-            .withId(1L)
-            .withName("tag3")
-            .withAuditInfo(auditInfo)
-            .withMetadataObjects(metadataObjects)
-            .build();
-    Assertions.assertEquals(metadataObjects, tag3.objects());
   }
 }
diff --git 
a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java
 
b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java
index 7a62eacab..233bcca86 100644
--- 
a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java
+++ 
b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java
@@ -51,6 +51,7 @@ import com.datastrato.gravitino.meta.RoleEntity;
 import com.datastrato.gravitino.meta.SchemaEntity;
 import com.datastrato.gravitino.meta.SchemaVersion;
 import com.datastrato.gravitino.meta.TableEntity;
+import com.datastrato.gravitino.meta.TagEntity;
 import com.datastrato.gravitino.meta.TopicEntity;
 import com.datastrato.gravitino.meta.UserEntity;
 import com.datastrato.gravitino.storage.RandomIdGenerator;
@@ -59,6 +60,7 @@ import 
com.datastrato.gravitino.storage.relational.mapper.UserMetaMapper;
 import com.datastrato.gravitino.storage.relational.service.RoleMetaService;
 import 
com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import com.datastrato.gravitino.storage.relational.utils.SessionUtils;
+import com.datastrato.gravitino.tag.TagManager;
 import com.datastrato.gravitino.utils.NamespaceUtil;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -520,6 +522,16 @@ public class TestJDBCBackend {
             Lists.newArrayList(role.id()));
     backend.insert(group, false);
 
+    TagEntity tag =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag")
+            .withNamespace(TagManager.ofTagNamespace("metalake"))
+            .withComment("tag comment")
+            .withAuditInfo(auditInfo)
+            .build();
+    backend.insert(tag, false);
+
     // another meta data creation
     BaseMetalake anotherMetaLake =
         createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), 
"another-metalake", auditInfo);
@@ -598,6 +610,16 @@ public class TestJDBCBackend {
             Lists.newArrayList(anotherRole.id()));
     backend.insert(anotherGroup, false);
 
+    TagEntity anotherTagEntity =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("another-tag")
+            .withNamespace(TagManager.ofTagNamespace("another-metalake"))
+            .withComment("another-tag comment")
+            .withAuditInfo(auditInfo)
+            .build();
+    backend.insert(anotherTagEntity, false);
+
     // meta data list
     List<BaseMetalake> metaLakes = backend.list(metalake.namespace(), 
Entity.EntityType.METALAKE);
     assertTrue(metaLakes.contains(metalake));
@@ -640,6 +662,12 @@ public class TestJDBCBackend {
                 GroupMetaMapper.class, mapper -> 
mapper.listGroupsByRoleId(role.id()))
             .size());
 
+    TagEntity tagEntity = backend.get(tag.nameIdentifier(), 
Entity.EntityType.TAG);
+    assertEquals(tag, tagEntity);
+    List<TagEntity> tags = backend.list(tag.namespace(), 
Entity.EntityType.TAG);
+    assertTrue(tags.contains(tag));
+    assertEquals(1, tags.size());
+
     // meta data soft delete
     backend.delete(metalake.nameIdentifier(), Entity.EntityType.METALAKE, 
true);
 
@@ -680,6 +708,9 @@ public class TestJDBCBackend {
             .size());
     assertTrue(backend.exists(anotherGroup.nameIdentifier(), 
Entity.EntityType.GROUP));
 
+    assertFalse(backend.exists(tag.nameIdentifier(), Entity.EntityType.TAG));
+    assertTrue(backend.exists(anotherTagEntity.nameIdentifier(), 
Entity.EntityType.TAG));
+
     // check legacy record after soft delete
     assertTrue(legacyRecordExistsInDB(metalake.id(), 
Entity.EntityType.METALAKE));
     assertTrue(legacyRecordExistsInDB(catalog.id(), 
Entity.EntityType.CATALOG));
@@ -694,6 +725,7 @@ public class TestJDBCBackend {
     assertEquals(2, countRoleRels(anotherRole.id()));
     assertEquals(2, listFilesetVersions(fileset.id()).size());
     assertEquals(3, listFilesetVersions(anotherFileset.id()).size());
+    assertTrue(legacyRecordExistsInDB(tag.id(), Entity.EntityType.TAG));
 
     // meta data hard delete
     for (Entity.EntityType entityType : Entity.EntityType.values()) {
@@ -711,6 +743,7 @@ public class TestJDBCBackend {
     assertEquals(0, countRoleRels(role.id()));
     assertEquals(2, countRoleRels(anotherRole.id()));
     assertEquals(0, listFilesetVersions(fileset.id()).size());
+    assertFalse(legacyRecordExistsInDB(tag.id(), Entity.EntityType.TAG));
 
     // soft delete for old version fileset
     assertEquals(3, listFilesetVersions(anotherFileset.id()).size());
@@ -768,6 +801,10 @@ public class TestJDBCBackend {
         tableName = "group_meta";
         idColumnName = "group_id";
         break;
+      case TAG:
+        tableName = "tag_meta";
+        idColumnName = "tag_id";
+        break;
       default:
         throw new IllegalArgumentException("Unsupported entity type: " + 
entityType);
     }
diff --git 
a/core/src/test/java/com/datastrato/gravitino/storage/relational/service/TestTagMetaService.java
 
b/core/src/test/java/com/datastrato/gravitino/storage/relational/service/TestTagMetaService.java
new file mode 100644
index 000000000..312156ba0
--- /dev/null
+++ 
b/core/src/test/java/com/datastrato/gravitino/storage/relational/service/TestTagMetaService.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datastrato.gravitino.storage.relational.service;
+
+import com.datastrato.gravitino.exceptions.NoSuchEntityException;
+import com.datastrato.gravitino.meta.AuditInfo;
+import com.datastrato.gravitino.meta.BaseMetalake;
+import com.datastrato.gravitino.meta.TagEntity;
+import com.datastrato.gravitino.storage.RandomIdGenerator;
+import com.datastrato.gravitino.storage.relational.TestJDBCBackend;
+import com.datastrato.gravitino.tag.TagManager;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestTagMetaService extends TestJDBCBackend {
+
+  private final String metalakeName = "metalake_for_tag_test";
+
+  private final AuditInfo auditInfo =
+      
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+  private final Map<String, String> props = ImmutableMap.of("k1", "v1");
+
+  @Test
+  public void testInsertAndGetTagByIdentifier() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    // Test no tag entity.
+    TagMetaService tagMetaService = TagMetaService.getInstance();
+    Exception excep =
+        Assertions.assertThrows(
+            NoSuchEntityException.class,
+            () -> 
tagMetaService.getTagByIdentifier(TagManager.ofTagIdent(metalakeName, "tag1")));
+    Assertions.assertEquals("No such tag entity: tag1", excep.getMessage());
+
+    // Test get tag entity
+    TagEntity tagEntity =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag1")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+    tagMetaService.insertTag(tagEntity, false);
+
+    TagEntity resultTagEntity =
+        tagMetaService.getTagByIdentifier(TagManager.ofTagIdent(metalakeName, 
"tag1"));
+    Assertions.assertEquals(tagEntity, resultTagEntity);
+
+    // Test with null comment and properties.
+    TagEntity tagEntity1 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag2")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withAuditInfo(auditInfo)
+            .build();
+
+    tagMetaService.insertTag(tagEntity1, false);
+    TagEntity resultTagEntity1 =
+        tagMetaService.getTagByIdentifier(TagManager.ofTagIdent(metalakeName, 
"tag2"));
+    Assertions.assertEquals(tagEntity1, resultTagEntity1);
+    Assertions.assertNull(resultTagEntity1.comment());
+    Assertions.assertNull(resultTagEntity1.properties());
+
+    // Test insert with overwrite.
+    TagEntity tagEntity2 =
+        TagEntity.builder()
+            .withId(tagEntity1.id())
+            .withName("tag3")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    Assertions.assertThrows(Exception.class, () -> 
tagMetaService.insertTag(tagEntity2, false));
+
+    tagMetaService.insertTag(tagEntity2, true);
+
+    TagEntity resultTagEntity2 =
+        tagMetaService.getTagByIdentifier(TagManager.ofTagIdent(metalakeName, 
"tag3"));
+    Assertions.assertEquals(tagEntity2, resultTagEntity2);
+  }
+
+  @Test
+  public void testCreateAndListTags() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    TagMetaService tagMetaService = TagMetaService.getInstance();
+    TagEntity tagEntity1 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag1")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+    tagMetaService.insertTag(tagEntity1, false);
+
+    TagEntity tagEntity2 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag2")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+    tagMetaService.insertTag(tagEntity2, false);
+
+    List<TagEntity> tagEntities =
+        
tagMetaService.listTagsByNamespace(TagManager.ofTagNamespace(metalakeName));
+    Assertions.assertEquals(2, tagEntities.size());
+    Assertions.assertTrue(tagEntities.contains(tagEntity1));
+    Assertions.assertTrue(tagEntities.contains(tagEntity2));
+  }
+
+  @Test
+  public void testUpdateTag() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    TagMetaService tagMetaService = TagMetaService.getInstance();
+    TagEntity tagEntity1 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag1")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+    tagMetaService.insertTag(tagEntity1, false);
+
+    // Update with no tag entity.
+    Exception excep =
+        Assertions.assertThrows(
+            NoSuchEntityException.class,
+            () ->
+                tagMetaService.updateTag(
+                    TagManager.ofTagIdent(metalakeName, "tag2"), tagEntity -> 
tagEntity));
+    Assertions.assertEquals("No such tag entity: tag2", excep.getMessage());
+
+    // Update tag entity.
+    TagEntity tagEntity2 =
+        TagEntity.builder()
+            .withId(tagEntity1.id())
+            .withName("tag1")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment1")
+            .withProperties(ImmutableMap.of("k2", "v2"))
+            .withAuditInfo(auditInfo)
+            .build();
+    TagEntity updatedTagEntity =
+        tagMetaService.updateTag(
+            TagManager.ofTagIdent(metalakeName, "tag1"), tagEntity -> 
tagEntity2);
+    Assertions.assertEquals(tagEntity2, updatedTagEntity);
+
+    TagEntity loadedTagEntity =
+        tagMetaService.getTagByIdentifier(TagManager.ofTagIdent(metalakeName, 
"tag1"));
+    Assertions.assertEquals(tagEntity2, loadedTagEntity);
+
+    // Update with different id.
+    TagEntity tagEntity3 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag1")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment1")
+            .withProperties(ImmutableMap.of("k2", "v2"))
+            .withAuditInfo(auditInfo)
+            .build();
+
+    Exception excep1 =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                tagMetaService.updateTag(
+                    TagManager.ofTagIdent(metalakeName, "tag1"), tagEntity -> 
tagEntity3));
+    Assertions.assertEquals(
+        "The updated tag entity id: "
+            + tagEntity3.id()
+            + " must have the same id as the old "
+            + "entity id "
+            + tagEntity2.id(),
+        excep1.getMessage());
+
+    TagEntity loadedTagEntity1 =
+        tagMetaService.getTagByIdentifier(TagManager.ofTagIdent(metalakeName, 
"tag1"));
+    Assertions.assertEquals(tagEntity2, loadedTagEntity1);
+  }
+
+  @Test
+  public void testDeleteTag() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    TagMetaService tagMetaService = TagMetaService.getInstance();
+    TagEntity tagEntity1 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag1")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+    tagMetaService.insertTag(tagEntity1, false);
+
+    boolean deleted = 
tagMetaService.deleteTag(TagManager.ofTagIdent(metalakeName, "tag1"));
+    Assertions.assertTrue(deleted);
+
+    deleted = tagMetaService.deleteTag(TagManager.ofTagIdent(metalakeName, 
"tag1"));
+    Assertions.assertFalse(deleted);
+
+    Exception excep =
+        Assertions.assertThrows(
+            NoSuchEntityException.class,
+            () -> 
tagMetaService.getTagByIdentifier(TagManager.ofTagIdent(metalakeName, "tag1")));
+    Assertions.assertEquals("No such tag entity: tag1", excep.getMessage());
+  }
+
+  @Test
+  public void testDeleteMetalake() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    TagMetaService tagMetaService = TagMetaService.getInstance();
+    TagEntity tagEntity1 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag1")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+    tagMetaService.insertTag(tagEntity1, false);
+
+    Assertions.assertTrue(
+        
MetalakeMetaService.getInstance().deleteMetalake(metalake.nameIdentifier(), 
false));
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () -> 
tagMetaService.getTagByIdentifier(TagManager.ofTagIdent(metalakeName, "tag1")));
+
+    // Test delete metalake with cascade.
+    BaseMetalake metalake1 =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName + 
"1", auditInfo);
+    backend.insert(metalake1, false);
+
+    TagEntity tagEntity2 =
+        TagEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("tag2")
+            .withNamespace(TagManager.ofTagNamespace(metalakeName + "1"))
+            .withComment("comment")
+            .withProperties(props)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    tagMetaService.insertTag(tagEntity2, false);
+    Assertions.assertTrue(
+        
MetalakeMetaService.getInstance().deleteMetalake(metalake1.nameIdentifier(), 
true));
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () -> 
tagMetaService.getTagByIdentifier(TagManager.ofTagIdent(metalakeName + "1", 
"tag2")));
+  }
+}
diff --git 
a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java
 
b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java
index 515d5acb6..1f36b242c 100644
--- 
a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java
+++ 
b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java
@@ -20,8 +20,10 @@
 package com.datastrato.gravitino.storage.relational.utils;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 import com.datastrato.gravitino.Catalog;
+import com.datastrato.gravitino.Entity;
 import com.datastrato.gravitino.Namespace;
 import com.datastrato.gravitino.json.JsonUtils;
 import com.datastrato.gravitino.meta.AuditInfo;
@@ -31,6 +33,7 @@ import com.datastrato.gravitino.meta.FilesetEntity;
 import com.datastrato.gravitino.meta.SchemaEntity;
 import com.datastrato.gravitino.meta.SchemaVersion;
 import com.datastrato.gravitino.meta.TableEntity;
+import com.datastrato.gravitino.meta.TagEntity;
 import com.datastrato.gravitino.meta.TopicEntity;
 import com.datastrato.gravitino.storage.relational.po.CatalogPO;
 import com.datastrato.gravitino.storage.relational.po.FilesetPO;
@@ -38,6 +41,7 @@ import 
com.datastrato.gravitino.storage.relational.po.FilesetVersionPO;
 import com.datastrato.gravitino.storage.relational.po.MetalakePO;
 import com.datastrato.gravitino.storage.relational.po.SchemaPO;
 import com.datastrato.gravitino.storage.relational.po.TablePO;
+import com.datastrato.gravitino.storage.relational.po.TagPO;
 import com.datastrato.gravitino.storage.relational.po.TopicPO;
 import com.datastrato.gravitino.utils.NamespaceUtil;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -594,6 +598,61 @@ public class TestPOConverters {
     assertEquals("test1", updatePO2.getFilesetName());
   }
 
+  @Test
+  public void testFromTagPO() throws JsonProcessingException {
+    TagPO tagPO = createTagPO(1L, "test", 1L, "this is test");
+    Namespace tagNS =
+        Namespace.of("test_metalake", Entity.SYSTEM_CATALOG_RESERVED_NAME, 
Entity.TAG_SCHEMA_NAME);
+
+    TagEntity expectedTag = createTag(1L, "test", tagNS, "this is test");
+    TagEntity convertedTag = POConverters.fromTagPO(tagPO, tagNS);
+
+    // Assert
+    assertEquals(expectedTag.id(), convertedTag.id());
+    assertEquals(expectedTag.name(), convertedTag.name());
+    assertEquals(expectedTag.namespace(), convertedTag.namespace());
+    assertEquals(expectedTag.auditInfo().creator(), 
convertedTag.auditInfo().creator());
+    assertEquals(expectedTag.comment(), convertedTag.comment());
+
+    TagPO tagPOWithNullComment = createTagPO(1L, "test", 1L, null);
+    TagEntity expectedTagWithNullComment = createTag(1L, "test", tagNS, null);
+    TagEntity convertedTagWithNullComment = 
POConverters.fromTagPO(tagPOWithNullComment, tagNS);
+    assertEquals(expectedTagWithNullComment.id(), 
convertedTagWithNullComment.id());
+    assertEquals(expectedTagWithNullComment.name(), 
convertedTagWithNullComment.name());
+    assertEquals(expectedTagWithNullComment.namespace(), 
convertedTagWithNullComment.namespace());
+    assertEquals(
+        expectedTagWithNullComment.auditInfo().creator(),
+        convertedTagWithNullComment.auditInfo().creator());
+    assertNull(convertedTagWithNullComment.comment());
+  }
+
+  @Test
+  public void testInitTagPOVersion() {
+    Namespace tagNS =
+        Namespace.of("test_metalake", Entity.SYSTEM_CATALOG_RESERVED_NAME, 
Entity.TAG_SCHEMA_NAME);
+    TagEntity tag = createTag(1L, "test", tagNS, "this is test");
+    TagPO.Builder builder = TagPO.builder().withMetalakeId(1L);
+    TagPO initPO = POConverters.initializeTagPOWithVersion(tag, builder);
+    assertEquals(1, initPO.getCurrentVersion());
+    assertEquals(1, initPO.getLastVersion());
+    assertEquals(0, initPO.getDeletedAt());
+  }
+
+  @Test
+  public void testUpdateTagPOVersion() {
+    Namespace tagNS =
+        Namespace.of("test_metalake", Entity.SYSTEM_CATALOG_RESERVED_NAME, 
Entity.TAG_SCHEMA_NAME);
+    TagEntity tag = createTag(1L, "test", tagNS, "this is test");
+    TagEntity updatedTag = createTag(1L, "test", tagNS, "this is test2");
+    TagPO.Builder builder = TagPO.builder().withMetalakeId(1L);
+    TagPO initPO = POConverters.initializeTagPOWithVersion(tag, builder);
+    TagPO updatePO = POConverters.updateTagPOWithVersion(initPO, updatedTag);
+    assertEquals(1, initPO.getCurrentVersion());
+    assertEquals(1, initPO.getLastVersion());
+    assertEquals(0, initPO.getDeletedAt());
+    assertEquals("this is test2", updatePO.getComment());
+  }
+
   private static BaseMetalake createMetalake(Long id, String name, String 
comment) {
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
@@ -844,4 +903,36 @@ public class TestPOConverters {
         .withDeletedAt(0L)
         .build();
   }
+
+  private static TagPO createTagPO(Long id, String name, Long metalakeId, 
String comment)
+      throws JsonProcessingException {
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
+    Map<String, String> properties = ImmutableMap.of("key", "value");
+    return TagPO.builder()
+        .withTagId(id)
+        .withTagName(name)
+        .withMetalakeId(metalakeId)
+        .withComment(comment)
+        
.withProperties(JsonUtils.anyFieldMapper().writeValueAsString(properties))
+        
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo))
+        .withCurrentVersion(1L)
+        .withLastVersion(1L)
+        .withDeletedAt(0L)
+        .build();
+  }
+
+  private static TagEntity createTag(Long id, String name, Namespace 
namespace, String comment) {
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
+    Map<String, String> properties = ImmutableMap.of("key", "value");
+    return TagEntity.builder()
+        .withId(id)
+        .withName(name)
+        .withNamespace(namespace)
+        .withComment(comment)
+        .withProperties(properties)
+        .withAuditInfo(auditInfo)
+        .build();
+  }
 }
diff --git 
a/core/src/test/java/com/datastrato/gravitino/tag/TestTagManager.java 
b/core/src/test/java/com/datastrato/gravitino/tag/TestTagManager.java
new file mode 100644
index 000000000..9845df8f9
--- /dev/null
+++ b/core/src/test/java/com/datastrato/gravitino/tag/TestTagManager.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datastrato.gravitino.tag;
+
+import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
+import static 
com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
+import static 
com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL;
+import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_STORE;
+import static com.datastrato.gravitino.Configs.ENTITY_STORE;
+import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE;
+import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
+import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;
+import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT;
+
+import com.datastrato.gravitino.Config;
+import com.datastrato.gravitino.EntityStore;
+import com.datastrato.gravitino.EntityStoreFactory;
+import com.datastrato.gravitino.GravitinoEnv;
+import com.datastrato.gravitino.exceptions.NoSuchMetalakeException;
+import com.datastrato.gravitino.exceptions.NoSuchTagException;
+import com.datastrato.gravitino.exceptions.TagAlreadyExistsException;
+import com.datastrato.gravitino.lock.LockManager;
+import com.datastrato.gravitino.meta.AuditInfo;
+import com.datastrato.gravitino.meta.BaseMetalake;
+import com.datastrato.gravitino.meta.SchemaVersion;
+import com.datastrato.gravitino.storage.IdGenerator;
+import com.datastrato.gravitino.storage.RandomIdGenerator;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.File;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestTagManager {
+
+  private static final String JDBC_STORE_PATH =
+      "/tmp/gravitino_jdbc_entityStore_" + 
UUID.randomUUID().toString().replace("-", "");
+
+  private static final String DB_DIR = JDBC_STORE_PATH + "/testdb";
+  private static final Config config = Mockito.mock(Config.class);
+
+  private static final String METALAKE = "metalake_for_tag_test";
+
+  private static EntityStore entityStore;
+
+  private static IdGenerator idGenerator;
+
+  private static TagManager tagManager;
+
+  @BeforeAll
+  public static void setUp() throws IOException, IllegalAccessException {
+    idGenerator = new RandomIdGenerator();
+
+    File dbDir = new File(DB_DIR);
+    dbDir.mkdirs();
+
+    Mockito.when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE);
+    
Mockito.when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE);
+    Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL))
+        
.thenReturn(String.format("jdbc:h2:file:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", 
DB_DIR));
+    
Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver");
+    
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
+    Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 
1000L);
+    Mockito.when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L);
+
+    Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+    Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+    Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
+
+    entityStore = EntityStoreFactory.createEntityStore(config);
+    entityStore.initialize(config);
+
+    AuditInfo audit = 
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+    BaseMetalake metalake =
+        BaseMetalake.builder()
+            .withId(idGenerator.nextId())
+            .withName(METALAKE)
+            .withVersion(SchemaVersion.V_0_1)
+            .withComment("Test metalake")
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(metalake, false /* overwritten */);
+
+    tagManager = new TagManager(idGenerator, entityStore);
+  }
+
+  @AfterAll
+  public static void tearDown() throws IOException {
+    if (entityStore != null) {
+      entityStore.close();
+      entityStore = null;
+    }
+
+    FileUtils.deleteDirectory(new File(JDBC_STORE_PATH));
+  }
+
+  @AfterEach
+  public void cleanUp() {
+    Arrays.stream(tagManager.listTags(METALAKE)).forEach(n -> 
tagManager.deleteTag(METALAKE, n));
+  }
+
+  @Test
+  public void testCreateAndGetTag() {
+    Tag tag = tagManager.createTag(METALAKE, "tag1", null, null);
+    Assertions.assertEquals("tag1", tag.name());
+    Assertions.assertNull(tag.comment());
+    Assertions.assertTrue(tag.properties().isEmpty());
+
+    Tag tag1 = tagManager.getTag(METALAKE, "tag1");
+    Assertions.assertEquals(tag, tag1);
+
+    // Create a tag in non-existent metalake
+    Exception e =
+        Assertions.assertThrows(
+            NoSuchMetalakeException.class,
+            () -> tagManager.createTag("non_existent_metalake", "tag1", null, 
null));
+    Assertions.assertEquals("Metalake non_existent_metalake does not exist", 
e.getMessage());
+
+    // Create a existent tag
+    e =
+        Assertions.assertThrows(
+            TagAlreadyExistsException.class,
+            () -> tagManager.createTag(METALAKE, "tag1", null, null));
+    Assertions.assertEquals(
+        "Tag with name tag1 under metalake metalake_for_tag_test already 
exists", e.getMessage());
+
+    // Get a non-existent tag
+    e =
+        Assertions.assertThrows(
+            NoSuchTagException.class, () -> tagManager.getTag(METALAKE, 
"non_existent_tag"));
+    Assertions.assertEquals(
+        "Tag with name non_existent_tag under metalake metalake_for_tag_test 
does not exist",
+        e.getMessage());
+  }
+
+  @Test
+  public void testCreateAndListTags() {
+    tagManager.createTag(METALAKE, "tag1", null, null);
+    tagManager.createTag(METALAKE, "tag2", null, null);
+    tagManager.createTag(METALAKE, "tag3", null, null);
+
+    Set<String> tagNames = 
Arrays.stream(tagManager.listTags(METALAKE)).collect(Collectors.toSet());
+    Assertions.assertEquals(3, tagNames.size());
+    Set<String> expectedNames = ImmutableSet.of("tag1", "tag2", "tag3");
+    Assertions.assertEquals(expectedNames, tagNames);
+
+    // List tags in non-existent metalake
+    Exception e =
+        Assertions.assertThrows(
+            NoSuchMetalakeException.class, () -> 
tagManager.listTags("non_existent_metalake"));
+    Assertions.assertEquals("Metalake non_existent_metalake does not exist", 
e.getMessage());
+  }
+
+  @Test
+  public void testAlterTag() {
+    String tagComment = "tag comment";
+    Map<String, String> tagProp = ImmutableMap.of("k1", "k2");
+    tagManager.createTag(METALAKE, "tag1", tagComment, tagProp);
+
+    // Test rename tag
+    TagChange rename = TagChange.rename("new_tag1");
+    Tag renamedTag = tagManager.alterTag(METALAKE, "tag1", rename);
+    Assertions.assertEquals("new_tag1", renamedTag.name());
+    Assertions.assertEquals(tagComment, renamedTag.comment());
+    Assertions.assertEquals(tagProp, renamedTag.properties());
+
+    // Test change comment
+    TagChange changeComment = TagChange.updateComment("new comment");
+    Tag changedCommentTag = tagManager.alterTag(METALAKE, "new_tag1", 
changeComment);
+    Assertions.assertEquals("new_tag1", changedCommentTag.name());
+    Assertions.assertEquals("new comment", changedCommentTag.comment());
+    Assertions.assertEquals(tagProp, changedCommentTag.properties());
+
+    // Test add new property
+    TagChange addProp = TagChange.setProperty("k2", "v2");
+    Tag addedPropTag = tagManager.alterTag(METALAKE, "new_tag1", addProp);
+    Assertions.assertEquals("new_tag1", addedPropTag.name());
+    Assertions.assertEquals("new comment", addedPropTag.comment());
+    Map<String, String> expectedProp = ImmutableMap.of("k1", "k2", "k2", "v2");
+    Assertions.assertEquals(expectedProp, addedPropTag.properties());
+
+    // Test update existing property
+    TagChange updateProp = TagChange.setProperty("k1", "v1");
+    Tag updatedPropTag = tagManager.alterTag(METALAKE, "new_tag1", updateProp);
+    Assertions.assertEquals("new_tag1", updatedPropTag.name());
+    Assertions.assertEquals("new comment", updatedPropTag.comment());
+    Map<String, String> expectedProp1 = ImmutableMap.of("k1", "v1", "k2", 
"v2");
+    Assertions.assertEquals(expectedProp1, updatedPropTag.properties());
+
+    // Test remove property
+    TagChange removeProp = TagChange.removeProperty("k1");
+    Tag removedPropTag = tagManager.alterTag(METALAKE, "new_tag1", removeProp);
+    Assertions.assertEquals("new_tag1", removedPropTag.name());
+    Assertions.assertEquals("new comment", removedPropTag.comment());
+    Map<String, String> expectedProp2 = ImmutableMap.of("k2", "v2");
+    Assertions.assertEquals(expectedProp2, removedPropTag.properties());
+  }
+
+  @Test
+  public void testDeleteTag() {
+    tagManager.createTag(METALAKE, "tag1", null, null);
+    Assertions.assertTrue(tagManager.deleteTag(METALAKE, "tag1"));
+
+    // Delete a non-existent tag
+    Assertions.assertFalse(tagManager.deleteTag(METALAKE, "non_existent_tag"));
+
+    // Delete a tag in non-existent metalake
+    Exception e =
+        Assertions.assertThrows(
+            NoSuchMetalakeException.class,
+            () -> tagManager.deleteTag("non_existent_metalake", "tag1"));
+    Assertions.assertEquals("Metalake non_existent_metalake does not exist", 
e.getMessage());
+  }
+}
diff --git 
a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java
 
b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java
index a763737e8..8600a9111 100644
--- 
a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java
+++ 
b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java
@@ -42,6 +42,7 @@ import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -179,7 +180,13 @@ public class AbstractIT {
                       + String.format(
                           "/scripts/mysql/schema-%s-mysql.sql", 
ConfigConstants.VERSION_0_6_0)),
               "UTF-8");
-      String[] initMySQLBackendSqls = mysqlContent.split(";");
+
+      String[] initMySQLBackendSqls =
+          Arrays.stream(mysqlContent.split(";"))
+              .map(String::trim)
+              .filter(s -> !s.isEmpty())
+              .toArray(String[]::new);
+
       initMySQLBackendSqls = ArrayUtils.addFirst(initMySQLBackendSqls, "use " 
+ META_DATA + ";");
       for (String sql : initMySQLBackendSqls) {
         statement.execute(sql);
diff --git 
a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/store/relational/service/FilesetMetaServiceIT.java
 
b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/store/relational/service/FilesetMetaServiceIT.java
index 1222cbd19..3d5925a86 100644
--- 
a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/store/relational/service/FilesetMetaServiceIT.java
+++ 
b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/store/relational/service/FilesetMetaServiceIT.java
@@ -54,6 +54,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
@@ -98,7 +99,11 @@ public class FilesetMetaServiceIT {
                       + String.format(
                           "/scripts/mysql/schema-%s-mysql.sql", 
ConfigConstants.VERSION_0_5_0)),
               "UTF-8");
-      String[] initMySQLBackendSqls = mysqlContent.split(";");
+      String[] initMySQLBackendSqls =
+          Arrays.stream(mysqlContent.split(";"))
+              .map(String::trim)
+              .filter(s -> !s.isEmpty())
+              .toArray(String[]::new);
       initMySQLBackendSqls = ArrayUtils.addFirst(initMySQLBackendSqls, "use " 
+ META_DATA + ";");
       for (String sql : initMySQLBackendSqls) {
         statement.execute(sql);
diff --git a/scripts/h2/schema-h2.sql b/scripts/h2/schema-h2.sql
index 8d29af599..f4c961cb6 100644
--- a/scripts/h2/schema-h2.sql
+++ b/scripts/h2/schema-h2.sql
@@ -215,4 +215,33 @@ CREATE TABLE IF NOT EXISTS `group_role_rel` (
     PRIMARY KEY (`id`),
     CONSTRAINT `uk_gi_ri_del` UNIQUE (`group_id`, `role_id`, `deleted_at`),
     KEY `idx_gid` (`group_id`)
-    ) ENGINE=InnoDB;
\ No newline at end of file
+    ) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `tag_meta` (
+    `tag_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'tag id',
+    `tag_name` VARCHAR(128) NOT NULL COMMENT 'tag name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `tag_comment` VARCHAR(256) DEFAULT '' COMMENT 'tag comment',
+    `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'tag properties',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'tag audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag current 
version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag last version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'tag deleted 
at',
+    PRIMARY KEY (`tag_id`),
+    UNIQUE KEY `uk_mn_tn_del` (`metalake_id`, `tag_name`, `deleted_at`)
+    ) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `tag_relation_meta` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `tag_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'tag id',
+    `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object 
id',
+    `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'tag relation audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag relation 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag relation last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'tag relation 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_ti_mi_del` (`tag_id`, `metadata_object_id`, `deleted_at`),
+    KEY `idx_tid` (`tag_id`),
+    KEY `idx_mid` (`metadata_object_id`)
+    ) ENGINE=InnoDB;
diff --git a/scripts/mysql/schema-0.5.0-mysql.sql 
b/scripts/mysql/schema-0.5.0-mysql.sql
index 79fe9eb7f..f70a442e1 100644
--- a/scripts/mysql/schema-0.5.0-mysql.sql
+++ b/scripts/mysql/schema-0.5.0-mysql.sql
@@ -197,4 +197,4 @@ CREATE TABLE IF NOT EXISTS `group_role_rel` (
     PRIMARY KEY (`id`),
     UNIQUE KEY `uk_gi_ri_del` (`group_id`, `role_id`, `deleted_at`),
     KEY `idx_rid` (`group_id`)
-    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'group 
role relation';
\ No newline at end of file
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'group 
role relation';
diff --git a/scripts/mysql/schema-0.6.0-mysql.sql 
b/scripts/mysql/schema-0.6.0-mysql.sql
index 198eb51e1..8418a5602 100644
--- a/scripts/mysql/schema-0.6.0-mysql.sql
+++ b/scripts/mysql/schema-0.6.0-mysql.sql
@@ -208,4 +208,33 @@ CREATE TABLE IF NOT EXISTS `group_role_rel` (
     PRIMARY KEY (`id`),
     UNIQUE KEY `uk_gi_ri_del` (`group_id`, `role_id`, `deleted_at`),
     KEY `idx_rid` (`group_id`)
-    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'group 
role relation';
\ No newline at end of file
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'group 
role relation';
+
+CREATE TABLE IF NOT EXISTS `tag_meta` (
+    `tag_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'tag id',
+    `tag_name` VARCHAR(128) NOT NULL COMMENT 'tag name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `tag_comment` VARCHAR(256) DEFAULT '' COMMENT 'tag comment',
+    `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'tag properties',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'tag audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag current 
version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag last version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'tag deleted 
at',
+    PRIMARY KEY (`tag_id`),
+    UNIQUE KEY `uk_mi_tn_del` (`metalake_id`, `tag_name`, `deleted_at`)
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'tag 
metadata';
+
+CREATE TABLE IF NOT EXISTS `tag_relation_meta` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `tag_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'tag id',
+    `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object 
id',
+    `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'tag relation audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag relation 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag relation last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'tag relation 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_ti_mi_del` (`tag_id`, `metadata_object_id`, `deleted_at`),
+    KEY `idx_tid` (`tag_id`),
+    KEY `idx_mid` (`metadata_object_id`)
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'tag 
metadata object relation';
diff --git a/scripts/mysql/upgrade-0.5.0-to-0.6.0-mysql.sql 
b/scripts/mysql/upgrade-0.5.0-to-0.6.0-mysql.sql
index 7089e79de..8fb71f730 100644
--- a/scripts/mysql/upgrade-0.5.0-to-0.6.0-mysql.sql
+++ b/scripts/mysql/upgrade-0.5.0-to-0.6.0-mysql.sql
@@ -35,3 +35,32 @@ CREATE TABLE IF NOT EXISTS `role_meta_securable_object` (
     KEY `idx_obj_rid` (`role_id`),
     KEY `idx_obj_eid` (`entity_id`)
     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 
'securable object meta';
+
+CREATE TABLE IF NOT EXISTS `tag_meta` (
+    `tag_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'tag id',
+    `tag_name` VARCHAR(128) NOT NULL COMMENT 'tag name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `tag_comment` VARCHAR(256) DEFAULT '' COMMENT 'tag comment',
+    `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'tag properties',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'tag audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag current 
version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag last version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'tag deleted 
at',
+    PRIMARY KEY (`tag_id`),
+    UNIQUE KEY `uk_mn_tn_del` (`metalake_id`, `tag_name`, `deleted_at`)
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'tag 
metadata';
+
+CREATE TABLE IF NOT EXISTS `tag_relation_meta` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `tag_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'tag id',
+    `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object 
id',
+    `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'tag relation audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag relation 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'tag relation last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'tag relation 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_ti_mi_del` (`tag_id`, `metadata_object_id`, `deleted_at`),
+    KEY `idx_tid` (`tag_id`),
+    KEY `idx_mid` (`metadata_object_id`)
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'tag 
metadata object relation';


Reply via email to