This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new a89a1ca2d [#4599][#4809] feat(core): Add tag support for column (#5186)
a89a1ca2d is described below

commit a89a1ca2de011994e59d5cd59f37dd6b2d76f049
Author: Jerry Shao <[email protected]>
AuthorDate: Fri Oct 25 08:24:33 2024 +0800

    [#4599][#4809] feat(core): Add tag support for column (#5186)
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add tag support for columns.
    
    ### Why are the changes needed?
    
    With this PR, user could add and remove tags in the column level.
    
    Fix: #4599
    Fix: #4809
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add new UT and IT to test.
    
    ---------
    
    Co-authored-by: Qi Yu <[email protected]>
---
 .../main/java/org/apache/gravitino/rel/Column.java |   9 +
 .../org/apache/gravitino/TestMetadataObjects.java  |  87 +++++++++
 .../hudi/integration/test/HudiCatalogHMSIT.java    |  53 +++---
 .../integration/test/CatalogIcebergBaseIT.java     |  14 +-
 .../integration/test/CatalogPaimonBaseIT.java      |  22 ++-
 .../lakehouse/paimon/utils/TestCatalogUtils.java   |   2 +
 .../org/apache/gravitino/client/GenericColumn.java | 128 +++++++++++++
 .../apache/gravitino/client/RelationalTable.java   |  13 +-
 .../apache/gravitino/client/TestGenericTag.java    |   5 +
 .../gravitino/client/TestRelationalCatalog.java    |  20 +-
 .../apache/gravitino/client/TestSupportTags.java   |  37 ++++
 .../gravitino/client/integration/test/TagIT.java   |  98 ++++++++++
 .../relational/mapper/TableColumnMapper.java       |   9 +
 .../mapper/TableColumnSQLProviderFactory.java      |   9 +
 .../provider/base/TableColumnBaseSQLProvider.java  |  28 +++
 .../relational/service/MetadataObjectService.java  | 202 ++++++++++++---------
 .../relational/service/TableColumnMetaService.java |  34 ++++
 .../java/org/apache/gravitino/tag/TagManager.java  |   1 -
 .../apache/gravitino/utils/MetadataObjectUtil.java |  11 +-
 .../apache/gravitino/utils/NameIdentifierUtil.java |  33 ++++
 .../org/apache/gravitino/utils/NamespaceUtil.java  |  26 +++
 .../service/TestTableColumnMetaService.java        | 110 +++++++++++
 .../org/apache/gravitino/tag/TestTagManager.java   |  91 +++++++++-
 .../gravitino/utils/TestMetadataObjectUtil.java    |  12 +-
 .../gravitino/utils/TestNameIdentifierUtil.java    |  12 +-
 docs/manage-tags-in-gravitino.md                   |   6 +-
 docs/open-api/openapi.yaml                         |   1 +
 docs/open-api/tags.yaml                            |   1 +
 28 files changed, 929 insertions(+), 145 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/rel/Column.java 
b/api/src/main/java/org/apache/gravitino/rel/Column.java
index 650f5748f..e508970fa 100644
--- a/api/src/main/java/org/apache/gravitino/rel/Column.java
+++ b/api/src/main/java/org/apache/gravitino/rel/Column.java
@@ -27,6 +27,7 @@ import org.apache.gravitino.annotation.Evolving;
 import org.apache.gravitino.rel.expressions.Expression;
 import org.apache.gravitino.rel.expressions.FunctionExpression;
 import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.tag.SupportsTags;
 
 /**
  * An interface representing a column of a {@link Table}. It defines basic 
properties of a column,
@@ -71,6 +72,14 @@ public interface Column {
    */
   Expression defaultValue();
 
+  /**
+   * @return the {@link SupportsTags} if the column supports tag operations.
+   * @throws UnsupportedOperationException if the column does not support tag 
operations.
+   */
+  default SupportsTags supportsTags() {
+    throw new UnsupportedOperationException("Column does not support tag 
operations.");
+  }
+
   /**
    * Create a {@link Column} instance.
    *
diff --git a/api/src/test/java/org/apache/gravitino/TestMetadataObjects.java 
b/api/src/test/java/org/apache/gravitino/TestMetadataObjects.java
new file mode 100644
index 000000000..bab5c5833
--- /dev/null
+++ b/api/src/test/java/org/apache/gravitino/TestMetadataObjects.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMetadataObjects {
+
+  @Test
+  public void testColumnObject() {
+    MetadataObject columnObject =
+        MetadataObjects.of("catalog.schema.table", "c1", 
MetadataObject.Type.COLUMN);
+    Assertions.assertEquals("catalog.schema.table", columnObject.parent());
+    Assertions.assertEquals("c1", columnObject.name());
+    Assertions.assertEquals(MetadataObject.Type.COLUMN, columnObject.type());
+    Assertions.assertEquals("catalog.schema.table.c1", 
columnObject.fullName());
+
+    MetadataObject columnObject2 =
+        MetadataObjects.of(
+            Lists.newArrayList("catalog", "schema", "table", "c2"), 
MetadataObject.Type.COLUMN);
+    Assertions.assertEquals("catalog.schema.table", columnObject2.parent());
+    Assertions.assertEquals("c2", columnObject2.name());
+    Assertions.assertEquals(MetadataObject.Type.COLUMN, columnObject2.type());
+    Assertions.assertEquals("catalog.schema.table.c2", 
columnObject2.fullName());
+
+    MetadataObject columnObject3 =
+        MetadataObjects.parse("catalog.schema.table.c3", 
MetadataObject.Type.COLUMN);
+    Assertions.assertEquals("catalog.schema.table", columnObject3.parent());
+    Assertions.assertEquals("c3", columnObject3.name());
+    Assertions.assertEquals(MetadataObject.Type.COLUMN, columnObject3.type());
+    Assertions.assertEquals("catalog.schema.table.c3", 
columnObject3.fullName());
+
+    // Test parent
+    MetadataObject parent = MetadataObjects.parent(columnObject);
+    Assertions.assertEquals("catalog.schema.table", parent.fullName());
+    Assertions.assertEquals("catalog.schema", parent.parent());
+    Assertions.assertEquals("table", parent.name());
+    Assertions.assertEquals(MetadataObject.Type.TABLE, parent.type());
+
+    // Test incomplete name
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> MetadataObjects.parse("c1", MetadataObject.Type.COLUMN));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> MetadataObjects.parse("catalog", MetadataObject.Type.COLUMN));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> MetadataObjects.parse("catalog.schema", 
MetadataObject.Type.COLUMN));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> MetadataObjects.parse("catalog.schema.table", 
MetadataObject.Type.COLUMN));
+
+    // Test incomplete name list
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> MetadataObjects.of(Lists.newArrayList("catalog"), 
MetadataObject.Type.COLUMN));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            MetadataObjects.of(
+                Lists.newArrayList("catalog", "schema"), 
MetadataObject.Type.COLUMN));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            MetadataObjects.of(
+                Lists.newArrayList("catalog", "schema", "table"), 
MetadataObject.Type.COLUMN));
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogHMSIT.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogHMSIT.java
index dcc7e1ad9..9fc1c81b5 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogHMSIT.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogHMSIT.java
@@ -291,77 +291,77 @@ public class HudiCatalogHMSIT extends BaseIT {
     Column[] columns = table.columns();
     Assertions.assertEquals(11, columns.length);
     if (table.name().endsWith("_rt") || table.name().endsWith("_ro")) {
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("_hoodie_commit_time")
               .withDataType(Types.StringType.get())
               .withComment("")
               .build(),
           columns[0]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("_hoodie_commit_seqno")
               .withDataType(Types.StringType.get())
               .withComment("")
               .build(),
           columns[1]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("_hoodie_record_key")
               .withDataType(Types.StringType.get())
               .withComment("")
               .build(),
           columns[2]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("_hoodie_partition_path")
               .withDataType(Types.StringType.get())
               .withComment("")
               .build(),
           columns[3]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("_hoodie_file_name")
               .withDataType(Types.StringType.get())
               .withComment("")
               .build(),
           columns[4]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("ts")
               .withDataType(Types.LongType.get())
               .withComment("")
               .build(),
           columns[5]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("uuid")
               .withDataType(Types.StringType.get())
               .withComment("")
               .build(),
           columns[6]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("rider")
               .withDataType(Types.StringType.get())
               .withComment("")
               .build(),
           columns[7]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("driver")
               .withDataType(Types.StringType.get())
               .withComment("")
               .build(),
           columns[8]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("fare")
               .withDataType(Types.DoubleType.get())
               .withComment("")
               .build(),
           columns[9]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("city")
               .withDataType(Types.StringType.get())
@@ -369,57 +369,66 @@ public class HudiCatalogHMSIT extends BaseIT {
               .build(),
           columns[10]);
     } else {
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("_hoodie_commit_time")
               .withDataType(Types.StringType.get())
               .build(),
           columns[0]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("_hoodie_commit_seqno")
               .withDataType(Types.StringType.get())
               .build(),
           columns[1]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("_hoodie_record_key")
               .withDataType(Types.StringType.get())
               .build(),
           columns[2]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("_hoodie_partition_path")
               .withDataType(Types.StringType.get())
               .build(),
           columns[3]);
-      Assertions.assertEquals(
+      assertColumn(
           ColumnDTO.builder()
               .withName("_hoodie_file_name")
               .withDataType(Types.StringType.get())
               .build(),
           columns[4]);
-      Assertions.assertEquals(
+      assertColumn(
           
ColumnDTO.builder().withName("ts").withDataType(Types.LongType.get()).build(),
           columns[5]);
-      Assertions.assertEquals(
+      assertColumn(
           
ColumnDTO.builder().withName("uuid").withDataType(Types.StringType.get()).build(),
           columns[6]);
-      Assertions.assertEquals(
+      assertColumn(
           
ColumnDTO.builder().withName("rider").withDataType(Types.StringType.get()).build(),
           columns[7]);
-      Assertions.assertEquals(
+      assertColumn(
           
ColumnDTO.builder().withName("driver").withDataType(Types.StringType.get()).build(),
           columns[8]);
-      Assertions.assertEquals(
+      assertColumn(
           
ColumnDTO.builder().withName("fare").withDataType(Types.DoubleType.get()).build(),
           columns[9]);
-      Assertions.assertEquals(
+      assertColumn(
           
ColumnDTO.builder().withName("city").withDataType(Types.StringType.get()).build(),
           columns[10]);
     }
   }
 
+  private void assertColumn(ColumnDTO columnDTO, Column column) {
+    Assertions.assertEquals(columnDTO.name(), column.name());
+    Assertions.assertEquals(columnDTO.dataType(), column.dataType());
+    Assertions.assertEquals(columnDTO.comment(), column.comment());
+    Assertions.assertEquals(columnDTO.nullable(), column.nullable());
+    Assertions.assertEquals(columnDTO.autoIncrement(), column.autoIncrement());
+    Assertions.assertEquals(columnDTO.defaultValue(), column.defaultValue());
+  }
+
   private static void createHudiTables() {
     sparkSession =
         SparkSession.builder()
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
index 7c5d93362..57598dd24 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
@@ -51,7 +51,6 @@ import 
org.apache.gravitino.catalog.lakehouse.iceberg.IcebergSchemaPropertiesMet
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergTable;
 import 
org.apache.gravitino.catalog.lakehouse.iceberg.ops.IcebergCatalogWrapperHelper;
 import org.apache.gravitino.client.GravitinoMetalake;
-import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
@@ -417,7 +416,7 @@ public abstract class CatalogIcebergBaseIT extends BaseIT {
     Assertions.assertEquals(createdTable.columns().length, columns.length);
 
     for (int i = 0; i < columns.length; i++) {
-      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
createdTable.columns()[i]);
+      assertColumn(columns[i], createdTable.columns()[i]);
     }
 
     // TODO add partitioning and sort order check
@@ -434,7 +433,7 @@ public abstract class CatalogIcebergBaseIT extends BaseIT {
     }
     Assertions.assertEquals(loadTable.columns().length, columns.length);
     for (int i = 0; i < columns.length; i++) {
-      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
loadTable.columns()[i]);
+      assertColumn(columns[i], loadTable.columns()[i]);
     }
 
     Assertions.assertEquals(partitioning.length, 
loadTable.partitioning().length);
@@ -1257,4 +1256,13 @@ public abstract class CatalogIcebergBaseIT extends 
BaseIT {
       Assertions.assertEquals(entry.getValue(), 
table.properties().get(entry.getKey()));
     }
   }
+
+  protected void assertColumn(Column expectedColumn, Column actualColumn) {
+    Assertions.assertEquals(expectedColumn.name(), actualColumn.name());
+    Assertions.assertEquals(expectedColumn.dataType(), 
actualColumn.dataType());
+    Assertions.assertEquals(expectedColumn.comment(), actualColumn.comment());
+    Assertions.assertEquals(expectedColumn.nullable(), 
actualColumn.nullable());
+    Assertions.assertEquals(expectedColumn.autoIncrement(), 
actualColumn.autoIncrement());
+    Assertions.assertEquals(expectedColumn.defaultValue(), 
actualColumn.defaultValue());
+  }
 }
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
index 668cd404e..ea1e8debc 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
@@ -46,7 +46,6 @@ import 
org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig;
 import 
org.apache.gravitino.catalog.lakehouse.paimon.ops.PaimonBackendCatalogWrapper;
 import org.apache.gravitino.catalog.lakehouse.paimon.utils.CatalogUtils;
 import org.apache.gravitino.client.GravitinoMetalake;
-import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
@@ -256,7 +255,7 @@ public abstract class CatalogPaimonBaseIT extends BaseIT {
     Assertions.assertEquals(createdTable.columns().length, columns.length);
 
     for (int i = 0; i < columns.length; i++) {
-      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
createdTable.columns()[i]);
+      assertColumn(columns[i], createdTable.columns()[i]);
     }
 
     Table loadTable = tableCatalog.loadTable(tableIdentifier);
@@ -269,7 +268,7 @@ public abstract class CatalogPaimonBaseIT extends BaseIT {
     }
     Assertions.assertEquals(loadTable.columns().length, columns.length);
     for (int i = 0; i < columns.length; i++) {
-      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
loadTable.columns()[i]);
+      assertColumn(columns[i], loadTable.columns()[i]);
     }
 
     // catalog load check
@@ -346,7 +345,7 @@ public abstract class CatalogPaimonBaseIT extends BaseIT {
     Assertions.assertEquals(createdTable.columns().length, columns.length);
 
     for (int i = 0; i < columns.length; i++) {
-      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
createdTable.columns()[i]);
+      assertColumn(columns[i], createdTable.columns()[i]);
     }
 
     Table loadTable = tableCatalog.loadTable(tableIdentifier);
@@ -374,7 +373,7 @@ public abstract class CatalogPaimonBaseIT extends BaseIT {
     Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
     Assertions.assertEquals(loadTable.columns().length, columns.length);
     for (int i = 0; i < columns.length; i++) {
-      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
loadTable.columns()[i]);
+      assertColumn(columns[i], loadTable.columns()[i]);
     }
 
     // catalog load check
@@ -459,7 +458,7 @@ public abstract class CatalogPaimonBaseIT extends BaseIT {
     }
     Assertions.assertEquals(createdTable.columns().length, columns.length);
     for (int i = 0; i < columns.length; i++) {
-      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
createdTable.columns()[i]);
+      assertColumn(columns[i], createdTable.columns()[i]);
     }
 
     Table loadTable = tableCatalog.loadTable(tableIdentifier);
@@ -488,7 +487,7 @@ public abstract class CatalogPaimonBaseIT extends BaseIT {
     }
     Assertions.assertEquals(loadTable.columns().length, columns.length);
     for (int i = 0; i < columns.length; i++) {
-      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
loadTable.columns()[i]);
+      assertColumn(columns[i], loadTable.columns()[i]);
     }
 
     // catalog load check
@@ -969,4 +968,13 @@ public abstract class CatalogPaimonBaseIT extends BaseIT {
             .enableHiveSupport()
             .getOrCreate();
   }
+
+  protected void assertColumn(Column expectedColumn, Column actualColumn) {
+    Assertions.assertEquals(expectedColumn.name(), actualColumn.name());
+    Assertions.assertEquals(expectedColumn.dataType(), 
actualColumn.dataType());
+    Assertions.assertEquals(expectedColumn.comment(), actualColumn.comment());
+    Assertions.assertEquals(expectedColumn.nullable(), 
actualColumn.nullable());
+    Assertions.assertEquals(expectedColumn.autoIncrement(), 
actualColumn.autoIncrement());
+    Assertions.assertEquals(expectedColumn.defaultValue(), 
actualColumn.defaultValue());
+  }
 }
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java
index d1b50d520..c81ae830e 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java
@@ -36,9 +36,11 @@ import org.apache.paimon.catalog.FileSystemCatalog;
 import org.apache.paimon.factories.FactoryException;
 import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.jdbc.JdbcCatalog;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
 /** Tests for {@link 
org.apache.gravitino.catalog.lakehouse.paimon.utils.CatalogUtils}. */
+@Tag("gravitino-docker-test")
 public class TestCatalogUtils {
 
   @Test
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericColumn.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericColumn.java
new file mode 100644
index 000000000..aacf022e9
--- /dev/null
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericColumn.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import com.google.common.collect.Lists;
+import java.util.Objects;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.exceptions.NoSuchTagException;
+import org.apache.gravitino.exceptions.TagAlreadyAssociatedException;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.tag.SupportsTags;
+import org.apache.gravitino.tag.Tag;
+
+/** Represents a generic column. */
+public class GenericColumn implements Column, SupportsTags {
+
+  private final Column internalColumn;
+
+  private final MetadataObjectTagOperations objectTagOperations;
+
+  GenericColumn(
+      Column column,
+      RESTClient restClient,
+      String metalake,
+      String catalog,
+      String schema,
+      String table) {
+    this.internalColumn = column;
+    MetadataObject columnObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalog, schema, table, internalColumn.name()),
+            MetadataObject.Type.COLUMN);
+    this.objectTagOperations = new MetadataObjectTagOperations(metalake, 
columnObject, restClient);
+  }
+
+  @Override
+  public SupportsTags supportsTags() {
+    return this;
+  }
+
+  @Override
+  public String[] listTags() {
+    return objectTagOperations.listTags();
+  }
+
+  @Override
+  public Tag[] listTagsInfo() {
+    return objectTagOperations.listTagsInfo();
+  }
+
+  @Override
+  public Tag getTag(String name) throws NoSuchTagException {
+    return objectTagOperations.getTag(name);
+  }
+
+  @Override
+  public String[] associateTags(String[] tagsToAdd, String[] tagsToRemove)
+      throws TagAlreadyAssociatedException {
+    return objectTagOperations.associateTags(tagsToAdd, tagsToRemove);
+  }
+
+  @Override
+  public String name() {
+    return internalColumn.name();
+  }
+
+  @Override
+  public Type dataType() {
+    return internalColumn.dataType();
+  }
+
+  @Override
+  public String comment() {
+    return internalColumn.comment();
+  }
+
+  @Override
+  public boolean nullable() {
+    return internalColumn.nullable();
+  }
+
+  @Override
+  public boolean autoIncrement() {
+    return internalColumn.autoIncrement();
+  }
+
+  @Override
+  public Expression defaultValue() {
+    return internalColumn.defaultValue();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof GenericColumn)) {
+      return false;
+    }
+
+    GenericColumn column = (GenericColumn) obj;
+    return Objects.equals(internalColumn, column.internalColumn);
+  }
+
+  @Override
+  public int hashCode() {
+    return internalColumn.hashCode();
+  }
+}
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalTable.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalTable.java
index 83634295f..e2ace7de2 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalTable.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalTable.java
@@ -23,6 +23,7 @@ import static 
org.apache.gravitino.dto.util.DTOConverters.toDTO;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -113,7 +114,17 @@ class RelationalTable implements Table, 
SupportsPartitions, SupportsTags, Suppor
   /** @return the columns of the table. */
   @Override
   public Column[] columns() {
-    return table.columns();
+    return Arrays.stream(table.columns())
+        .map(
+            c ->
+                new GenericColumn(
+                    c,
+                    restClient,
+                    namespace.level(0),
+                    namespace.level(1),
+                    namespace.level(2),
+                    name()))
+        .toArray(Column[]::new);
   }
 
   /** @return the partitioning of the table. */
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestGenericTag.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestGenericTag.java
index 0e86e9bf6..20463ddb8 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestGenericTag.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestGenericTag.java
@@ -100,6 +100,11 @@ public class TestGenericTag extends TestBase {
               .withParent("catalog1.schema1")
               .withName("table1")
               .withType(MetadataObject.Type.TABLE)
+              .build(),
+          MetadataObjectDTO.builder()
+              .withParent("catalog1.schema1.table1")
+              .withName("column1")
+              .withType(MetadataObject.Type.COLUMN)
               .build()
         };
 
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
index 01d17271e..c5e36247b 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
@@ -36,6 +36,7 @@ import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
+import java.util.stream.IntStream;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
@@ -727,9 +728,22 @@ public class TestRelationalCatalog extends TestBase {
     Assertions.assertEquals(expected.name(), actual.name());
     Assertions.assertEquals(expected.comment(), actual.comment());
     Assertions.assertEquals(expected.properties(), actual.properties());
-
-    Assertions.assertArrayEquals(expected.columns(), actual.columns());
-
+    Assertions.assertEquals(expected.columns().length, 
actual.columns().length);
+    IntStream.range(0, expected.columns().length)
+        .forEach(
+            i -> {
+              Assertions.assertEquals(expected.columns()[i].name(), 
actual.columns()[i].name());
+              Assertions.assertEquals(
+                  expected.columns()[i].dataType(), 
actual.columns()[i].dataType());
+              Assertions.assertEquals(
+                  expected.columns()[i].comment(), 
actual.columns()[i].comment());
+              Assertions.assertEquals(
+                  expected.columns()[i].nullable(), 
actual.columns()[i].nullable());
+              Assertions.assertEquals(
+                  expected.columns()[i].autoIncrement(), 
actual.columns()[i].autoIncrement());
+              Assertions.assertEquals(
+                  expected.columns()[i].defaultValue(), 
actual.columns()[i].defaultValue());
+            });
     Assertions.assertArrayEquals(expected.partitioning(), 
actual.partitioning());
   }
 
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportTags.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportTags.java
index a80fb3246..3d903a972 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportTags.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportTags.java
@@ -46,6 +46,7 @@ import org.apache.gravitino.exceptions.NoSuchTagException;
 import org.apache.gravitino.exceptions.NotFoundException;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.messaging.Topic;
+import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.tag.SupportsTags;
@@ -69,6 +70,8 @@ public class TestSupportTags extends TestBase {
 
   private static Table relationalTable;
 
+  private static Column genericColumn;
+
   private static Fileset genericFileset;
 
   private static Topic genericTopic;
@@ -141,6 +144,8 @@ public class TestSupportTags extends TestBase {
                 .build(),
             client.restClient());
 
+    genericColumn = relationalTable.columns()[0];
+
     genericFileset =
         new GenericFileset(
             FilesetDTO.builder()
@@ -195,6 +200,14 @@ public class TestSupportTags extends TestBase {
         MetadataObjects.of("catalog1.schema1", relationalTable.name(), 
MetadataObject.Type.TABLE));
   }
 
+  @Test
+  public void testListTagsForColumn() throws JsonProcessingException {
+    testListTags(
+        genericColumn.supportsTags(),
+        MetadataObjects.of(
+            "catalog1.schema1.table1", genericColumn.name(), 
MetadataObject.Type.COLUMN));
+  }
+
   @Test
   public void testListTagsForFileset() throws JsonProcessingException {
     testListTags(
@@ -238,6 +251,14 @@ public class TestSupportTags extends TestBase {
         MetadataObjects.of("catalog1.schema1", relationalTable.name(), 
MetadataObject.Type.TABLE));
   }
 
+  @Test
+  public void testListTagsInfoForColumn() throws JsonProcessingException {
+    testListTagsInfo(
+        genericColumn.supportsTags(),
+        MetadataObjects.of(
+            "catalog1.schema1.table1", genericColumn.name(), 
MetadataObject.Type.COLUMN));
+  }
+
   @Test
   public void testListTagsInfoForFileset() throws JsonProcessingException {
     testListTagsInfo(
@@ -281,6 +302,14 @@ public class TestSupportTags extends TestBase {
         MetadataObjects.of("catalog1.schema1", relationalTable.name(), 
MetadataObject.Type.TABLE));
   }
 
+  @Test
+  public void testGetTagForColumn() throws JsonProcessingException {
+    testGetTag(
+        genericColumn.supportsTags(),
+        MetadataObjects.of(
+            "catalog1.schema1.table1", genericColumn.name(), 
MetadataObject.Type.COLUMN));
+  }
+
   @Test
   public void testGetTagForFileset() throws JsonProcessingException {
     testGetTag(
@@ -324,6 +353,14 @@ public class TestSupportTags extends TestBase {
         MetadataObjects.of("catalog1.schema1", relationalTable.name(), 
MetadataObject.Type.TABLE));
   }
 
+  @Test
+  public void testAssociateTagsForColumn() throws JsonProcessingException {
+    testAssociateTags(
+        genericColumn.supportsTags(),
+        MetadataObjects.of(
+            "catalog1.schema1.table1", genericColumn.name(), 
MetadataObject.Type.COLUMN));
+  }
+
   @Test
   public void testAssociateTagsForFileset() throws JsonProcessingException {
     testAssociateTags(
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/TagIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/TagIT.java
index dc82dfd67..847b6253f 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/TagIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/TagIT.java
@@ -60,6 +60,8 @@ public class TagIT extends BaseIT {
   private static Schema schema;
   private static Table table;
 
+  private static Column column;
+
   @BeforeAll
   public void setUp() {
     containerSuite.startHiveContainer();
@@ -105,6 +107,7 @@ public class TagIT extends BaseIT {
                 },
                 "comment",
                 Collections.emptyMap());
+    column = Arrays.stream(table.columns()).filter(c -> 
c.name().equals("col1")).findFirst().get();
   }
 
   @AfterAll
@@ -508,4 +511,99 @@ public class TagIT extends BaseIT {
     Assertions.assertEquals(
         MetadataObject.Type.TABLE, 
tag3.associatedObjects().objects()[0].type());
   }
+
+  @Test
+  public void testAssociateTagsToColumn() {
+    Tag tag1 =
+        metalake.createTag(
+            GravitinoITUtils.genRandomName("tag_it_column_tag1"),
+            "comment1",
+            Collections.emptyMap());
+    Tag tag2 =
+        metalake.createTag(
+            GravitinoITUtils.genRandomName("tag_it_column_tag2"),
+            "comment2",
+            Collections.emptyMap());
+    Tag tag3 =
+        metalake.createTag(
+            GravitinoITUtils.genRandomName("tag_it_column_tag3"),
+            "comment3",
+            Collections.emptyMap());
+    Tag tag4 =
+        metalake.createTag(
+            GravitinoITUtils.genRandomName("tag_it_column_tag4"),
+            "comment4",
+            Collections.emptyMap());
+
+    // Associate tags to catalog
+    relationalCatalog.supportsTags().associateTags(new String[] {tag1.name()}, 
null);
+
+    // Associate tags to schema
+    schema.supportsTags().associateTags(new String[] {tag2.name()}, null);
+
+    // Associate tags to table
+    table.supportsTags().associateTags(new String[] {tag3.name()}, null);
+
+    // Associate tags to column
+    String[] tags = column.supportsTags().associateTags(new String[] 
{tag4.name()}, null);
+
+    Assertions.assertEquals(1, tags.length);
+    Set<String> tagNames = Sets.newHashSet(tags);
+    Assertions.assertTrue(tagNames.contains(tag4.name()));
+
+    // Test list associated tags for column
+    String[] tags1 = column.supportsTags().listTags();
+    Assertions.assertEquals(4, tags1.length);
+    Set<String> tagNames1 = Sets.newHashSet(tags1);
+    Assertions.assertTrue(tagNames1.contains(tag1.name()));
+    Assertions.assertTrue(tagNames1.contains(tag2.name()));
+    Assertions.assertTrue(tagNames1.contains(tag3.name()));
+    Assertions.assertTrue(tagNames1.contains(tag4.name()));
+
+    // Test list associated tags with details for column
+    Tag[] tags2 = column.supportsTags().listTagsInfo();
+    Assertions.assertEquals(4, tags2.length);
+
+    Set<Tag> nonInheritedTags =
+        Arrays.stream(tags2).filter(tag -> 
!tag.inherited().get()).collect(Collectors.toSet());
+    Set<Tag> inheritedTags =
+        Arrays.stream(tags2).filter(tag -> 
tag.inherited().get()).collect(Collectors.toSet());
+
+    Assertions.assertEquals(1, nonInheritedTags.size());
+    Assertions.assertEquals(3, inheritedTags.size());
+    Assertions.assertTrue(nonInheritedTags.contains(tag4));
+    Assertions.assertTrue(inheritedTags.contains(tag1));
+    Assertions.assertTrue(inheritedTags.contains(tag2));
+    Assertions.assertTrue(inheritedTags.contains(tag3));
+
+    // Test get associated tag for column
+    Tag resultTag1 = column.supportsTags().getTag(tag1.name());
+    Assertions.assertEquals(tag1, resultTag1);
+    Assertions.assertTrue(resultTag1.inherited().get());
+
+    Tag resultTag2 = column.supportsTags().getTag(tag2.name());
+    Assertions.assertEquals(tag2, resultTag2);
+    Assertions.assertTrue(resultTag2.inherited().get());
+
+    Tag resultTag3 = column.supportsTags().getTag(tag3.name());
+    Assertions.assertEquals(tag3, resultTag3);
+    Assertions.assertTrue(resultTag3.inherited().get());
+
+    Tag resultTag4 = column.supportsTags().getTag(tag4.name());
+    Assertions.assertEquals(tag4, resultTag4);
+    Assertions.assertFalse(resultTag4.inherited().get());
+
+    // Test get objects associated with tag
+    Assertions.assertEquals(1, tag1.associatedObjects().count());
+    Assertions.assertEquals(relationalCatalog.name(), 
tag1.associatedObjects().objects()[0].name());
+
+    Assertions.assertEquals(1, tag2.associatedObjects().count());
+    Assertions.assertEquals(schema.name(), 
tag2.associatedObjects().objects()[0].name());
+
+    Assertions.assertEquals(1, tag3.associatedObjects().count());
+    Assertions.assertEquals(table.name(), 
tag3.associatedObjects().objects()[0].name());
+
+    Assertions.assertEquals(1, tag4.associatedObjects().count());
+    Assertions.assertEquals(column.name(), 
tag4.associatedObjects().objects()[0].name());
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableColumnMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableColumnMapper.java
index 2214d8fd3..87b38ea48 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableColumnMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableColumnMapper.java
@@ -62,4 +62,13 @@ public interface TableColumnMapper {
       method = "deleteColumnPOsByLegacyTimeline")
   Integer deleteColumnPOsByLegacyTimeline(
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+  @SelectProvider(
+      type = TableColumnSQLProviderFactory.class,
+      method = "selectColumnIdByTableIdAndName")
+  Long selectColumnIdByTableIdAndName(
+      @Param("tableId") Long tableId, @Param("columnName") String name);
+
+  @SelectProvider(type = TableColumnSQLProviderFactory.class, method = 
"selectColumnPOById")
+  ColumnPO selectColumnPOById(@Param("columnId") Long columnId);
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableColumnSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableColumnSQLProviderFactory.java
index f85cf72d8..11f0d5419 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableColumnSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableColumnSQLProviderFactory.java
@@ -81,4 +81,13 @@ public class TableColumnSQLProviderFactory {
   public static String softDeleteColumnsBySchemaId(@Param("schemaId") Long 
schemaId) {
     return getProvider().softDeleteColumnsBySchemaId(schemaId);
   }
+
+  public static String selectColumnIdByTableIdAndName(
+      @Param("tableId") Long tableId, @Param("columnName") String name) {
+    return getProvider().selectColumnIdByTableIdAndName(tableId, name);
+  }
+
+  public static String selectColumnPOById(@Param("columnId") Long columnId) {
+    return getProvider().selectColumnPOById(columnId);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableColumnBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableColumnBaseSQLProvider.java
index cdc32425b..d6154c907 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableColumnBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableColumnBaseSQLProvider.java
@@ -103,4 +103,32 @@ public class TableColumnBaseSQLProvider {
         + TableColumnMapper.COLUMN_TABLE_NAME
         + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
   }
+
+  public String selectColumnIdByTableIdAndName(
+      @Param("tableId") Long tableId, @Param("columnName") String name) {
+    return "SELECT"
+        + "   CASE"
+        + "     WHEN column_op_type = 3 THEN NULL"
+        + "     ELSE column_id"
+        + "   END"
+        + " FROM "
+        + TableColumnMapper.COLUMN_TABLE_NAME
+        + " WHERE table_id = #{tableId} AND column_name = #{columnName} AND 
deleted_at = 0"
+        + " ORDER BY table_version DESC LIMIT 1";
+  }
+
+  public String selectColumnPOById(@Param("columnId") Long columnId) {
+    return "SELECT column_id AS columnId, column_name AS columnName,"
+        + " column_position AS columnPosition, metalake_id AS metalakeId, 
catalog_id AS catalogId,"
+        + " schema_id AS schemaId, table_id AS tableId,"
+        + " table_version AS tableVersion, column_type AS columnType,"
+        + " column_comment AS columnComment, column_nullable AS nullable,"
+        + " column_auto_increment AS autoIncrement,"
+        + " column_default_value AS defaultValue, column_op_type AS 
columnOpType,"
+        + " deleted_at AS deletedAt, audit_info AS auditInfo"
+        + " FROM "
+        + TableColumnMapper.COLUMN_TABLE_NAME
+        + " WHERE column_id = #{columnId} AND deleted_at = 0"
+        + " ORDER BY table_version DESC LIMIT 1";
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index 0ee28d029..c32759af5 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -24,6 +24,7 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.storage.relational.po.CatalogPO;
+import org.apache.gravitino.storage.relational.po.ColumnPO;
 import org.apache.gravitino.storage.relational.po.FilesetPO;
 import org.apache.gravitino.storage.relational.po.MetalakePO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
@@ -69,101 +70,130 @@ public class MetadataObjectService {
       return 
FilesetMetaService.getInstance().getFilesetIdBySchemaIdAndName(schemaId, 
names.get(2));
     } else if (type == MetadataObject.Type.TOPIC) {
       return 
TopicMetaService.getInstance().getTopicIdBySchemaIdAndName(schemaId, 
names.get(2));
-    } else if (type == MetadataObject.Type.TABLE) {
-      return 
TableMetaService.getInstance().getTableIdBySchemaIdAndName(schemaId, 
names.get(2));
     }
 
-    throw new IllegalArgumentException(String.format("Doesn't support the type 
%s", type));
-  }
-
-  // Metadata object may be null because the metadata object can be deleted 
asynchronously.
-  @Nullable
-  public static String getMetadataObjectFullName(String type, long 
metadataObjectId) {
-    MetadataObject.Type metadatatype = MetadataObject.Type.valueOf(type);
-    if (metadatatype == MetadataObject.Type.METALAKE) {
-      MetalakePO metalakePO = 
MetalakeMetaService.getInstance().getMetalakePOById(metadataObjectId);
-      if (metalakePO == null) {
-        return null;
-      }
-
-      return metalakePO.getMetalakeName();
-    }
-
-    if (metadatatype == MetadataObject.Type.CATALOG) {
-      return getCatalogFullName(metadataObjectId);
-    }
-
-    if (metadatatype == MetadataObject.Type.SCHEMA) {
-      return getSchemaFullName(metadataObjectId);
-    }
-
-    if (metadatatype == MetadataObject.Type.TABLE) {
-      TablePO tablePO = 
TableMetaService.getInstance().getTablePOById(metadataObjectId);
-      if (tablePO == null) {
-        return null;
-      }
-
-      String schemaName = getSchemaFullName(tablePO.getSchemaId());
-      if (schemaName == null) {
-        return null;
-      }
-
-      return DOT_JOINER.join(schemaName, tablePO.getTableName());
-    }
-
-    if (metadatatype == MetadataObject.Type.TOPIC) {
-      TopicPO topicPO = 
TopicMetaService.getInstance().getTopicPOById(metadataObjectId);
-      if (topicPO == null) {
-        return null;
-      }
-
-      String schemaName = getSchemaFullName(topicPO.getSchemaId());
-      if (schemaName == null) {
-        return null;
-      }
-
-      return DOT_JOINER.join(schemaName, topicPO.getTopicName());
+    long tableId =
+        TableMetaService.getInstance().getTableIdBySchemaIdAndName(schemaId, 
names.get(2));
+    if (type == MetadataObject.Type.TABLE) {
+      return tableId;
     }
 
-    if (metadatatype == MetadataObject.Type.FILESET) {
-      FilesetPO filesetPO = 
FilesetMetaService.getInstance().getFilesetPOById(metadataObjectId);
-      if (filesetPO == null) {
-        return null;
-      }
-
-      String schemaName = getSchemaFullName(filesetPO.getSchemaId());
-      if (schemaName == null) {
-        return null;
-      }
-
-      return DOT_JOINER.join(schemaName, filesetPO.getFilesetName());
+    if (type == MetadataObject.Type.COLUMN) {
+      return TableColumnMetaService.getInstance()
+          .getColumnIdByTableIdAndName(tableId, names.get(3));
     }
 
-    throw new IllegalArgumentException(String.format("Doesn't support the type 
%s", metadatatype));
-  }
-
-  @Nullable
-  private static String getCatalogFullName(Long entityId) {
-    CatalogPO catalogPO = 
CatalogMetaService.getInstance().getCatalogPOById(entityId);
-    if (catalogPO == null) {
-      return null;
-    }
-    return catalogPO.getCatalogName();
+    throw new IllegalArgumentException(String.format("Doesn't support the type 
%s", type));
   }
 
+  // Metadata object may be null because the metadata object can be deleted 
asynchronously.
   @Nullable
-  private static String getSchemaFullName(Long entityId) {
-    SchemaPO schemaPO = 
SchemaMetaService.getInstance().getSchemaPOById(entityId);
-
-    if (schemaPO == null) {
-      return null;
-    }
-
-    String catalogName = getCatalogFullName(schemaPO.getCatalogId());
-    if (catalogName == null) {
-      return null;
-    }
+  public static String getMetadataObjectFullName(String type, long 
metadataObjectId) {
+    MetadataObject.Type metadataType = MetadataObject.Type.valueOf(type);
+    String fullName = null;
+    long objectId = metadataObjectId;
+
+    do {
+      switch (metadataType) {
+        case METALAKE:
+          MetalakePO metalakePO = 
MetalakeMetaService.getInstance().getMetalakePOById(objectId);
+          if (metalakePO != null) {
+            fullName = metalakePO.getMetalakeName();
+            metadataType = null;
+          } else {
+            return null;
+          }
+          break;
+
+        case CATALOG:
+          CatalogPO catalogPO = 
CatalogMetaService.getInstance().getCatalogPOById(objectId);
+          if (catalogPO != null) {
+            fullName =
+                fullName != null
+                    ? DOT_JOINER.join(catalogPO.getCatalogName(), fullName)
+                    : catalogPO.getCatalogName();
+            metadataType = null;
+          } else {
+            return null;
+          }
+          break;
+
+        case SCHEMA:
+          SchemaPO schemaPO = 
SchemaMetaService.getInstance().getSchemaPOById(objectId);
+          if (schemaPO != null) {
+            fullName =
+                fullName != null
+                    ? DOT_JOINER.join(schemaPO.getSchemaName(), fullName)
+                    : schemaPO.getSchemaName();
+            objectId = schemaPO.getCatalogId();
+            metadataType = MetadataObject.Type.CATALOG;
+          } else {
+            return null;
+          }
+          break;
+
+        case TABLE:
+          TablePO tablePO = 
TableMetaService.getInstance().getTablePOById(objectId);
+          if (tablePO != null) {
+            fullName =
+                fullName != null
+                    ? DOT_JOINER.join(tablePO.getTableName(), fullName)
+                    : tablePO.getTableName();
+            objectId = tablePO.getSchemaId();
+            metadataType = MetadataObject.Type.SCHEMA;
+          } else {
+            return null;
+          }
+          break;
+
+        case TOPIC:
+          TopicPO topicPO = 
TopicMetaService.getInstance().getTopicPOById(objectId);
+          if (topicPO != null) {
+            fullName =
+                fullName != null
+                    ? DOT_JOINER.join(topicPO.getTopicName(), fullName)
+                    : topicPO.getTopicName();
+            objectId = topicPO.getSchemaId();
+            metadataType = MetadataObject.Type.SCHEMA;
+          } else {
+            return null;
+          }
+          break;
+
+        case FILESET:
+          FilesetPO filesetPO = 
FilesetMetaService.getInstance().getFilesetPOById(objectId);
+          if (filesetPO != null) {
+            fullName =
+                fullName != null
+                    ? DOT_JOINER.join(filesetPO.getFilesetName(), fullName)
+                    : filesetPO.getFilesetName();
+            objectId = filesetPO.getSchemaId();
+            metadataType = MetadataObject.Type.SCHEMA;
+          } else {
+            return null;
+          }
+          break;
+
+        case COLUMN:
+          ColumnPO columnPO = 
TableColumnMetaService.getInstance().getColumnPOById(objectId);
+          if (columnPO != null) {
+            fullName =
+                fullName != null
+                    ? DOT_JOINER.join(columnPO.getColumnName(), fullName)
+                    : columnPO.getColumnName();
+            objectId = columnPO.getTableId();
+            metadataType = MetadataObject.Type.TABLE;
+          } else {
+            return null;
+          }
+          break;
+
+        default:
+          throw new IllegalArgumentException(
+              String.format("Doesn't support the type %s", metadataType));
+      }
+    } while (metadataType != null);
 
-    return DOT_JOINER.join(catalogName, schemaPO.getSchemaName());
+    return fullName;
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
index f881602bc..9e2b3530d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
@@ -21,9 +21,12 @@ package org.apache.gravitino.storage.relational.service;
 import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
@@ -54,6 +57,37 @@ public class TableColumnMetaService {
         .collect(Collectors.toList());
   }
 
+  Long getColumnIdByTableIdAndName(Long tableId, String columnName) {
+    Long columnId =
+        SessionUtils.getWithoutCommit(
+            TableColumnMapper.class,
+            mapper -> mapper.selectColumnIdByTableIdAndName(tableId, 
columnName));
+
+    if (columnId == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.COLUMN.name().toLowerCase(Locale.ROOT),
+          columnName);
+    }
+
+    return columnId;
+  }
+
+  ColumnPO getColumnPOById(Long columnId) {
+    ColumnPO columnPO =
+        SessionUtils.getWithoutCommit(
+            TableColumnMapper.class, mapper -> 
mapper.selectColumnPOById(columnId));
+
+    if (columnPO == null || columnPO.getColumnOpType() == 
ColumnPO.ColumnOpType.DELETE.value()) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.COLUMN.name().toLowerCase(Locale.ROOT),
+          columnId.toString());
+    }
+
+    return columnPO;
+  }
+
   void insertColumnPOs(TablePO tablePO, List<ColumnEntity> columnEntities) {
     List<ColumnPO> columnPOs =
         POConverters.initializeColumnPOs(tablePO, columnEntities, 
ColumnPO.ColumnOpType.CREATE);
diff --git a/core/src/main/java/org/apache/gravitino/tag/TagManager.java 
b/core/src/main/java/org/apache/gravitino/tag/TagManager.java
index 1b1626de0..c5d41adb0 100644
--- a/core/src/main/java/org/apache/gravitino/tag/TagManager.java
+++ b/core/src/main/java/org/apache/gravitino/tag/TagManager.java
@@ -298,7 +298,6 @@ public class TagManager {
       throws NoSuchMetadataObjectException, TagAlreadyAssociatedException {
     Preconditions.checkArgument(
         !metadataObject.type().equals(MetadataObject.Type.METALAKE)
-            && !metadataObject.type().equals(MetadataObject.Type.COLUMN)
             && !metadataObject.type().equals(MetadataObject.Type.ROLE),
         "Cannot associate tags for unsupported metadata object type %s",
         metadataObject.type());
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
index 014ae3a18..da9f4129a 100644
--- a/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
@@ -93,12 +93,9 @@ public class MetadataObjectUtil {
       case TABLE:
       case TOPIC:
       case FILESET:
+      case COLUMN:
         String fullName = DOT.join(metalakeName, metadataObject.fullName());
         return NameIdentifier.parse(fullName);
-      case COLUMN:
-        throw new IllegalArgumentException(
-            "Cannot convert column metadata object to entity identifier: "
-                + metadataObject.fullName());
       default:
         throw new IllegalArgumentException(
             "Unknown metadata object type: " + metadataObject.type());
@@ -150,6 +147,12 @@ public class MetadataObjectUtil {
         check(env.tableDispatcher().tableExists(identifier), 
exceptionToThrowSupplier);
         break;
 
+      case COLUMN:
+        NameIdentifierUtil.checkColumn(identifier);
+        NameIdentifier tableIdent = 
NameIdentifier.of(identifier.namespace().levels());
+        check(env.tableDispatcher().tableExists(tableIdent), 
exceptionToThrowSupplier);
+        break;
+
       case TOPIC:
         NameIdentifierUtil.checkTopic(identifier);
         check(env.topicDispatcher().topicExists(identifier), 
exceptionToThrowSupplier);
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index 30f560102..550fef967 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -93,6 +93,22 @@ public class NameIdentifierUtil {
     return NameIdentifier.of(metalake, catalog, schema, table);
   }
 
+  /**
+   * Create the column {@link NameIdentifier} with the given metalake, 
catalog, schema, table and
+   * column name.
+   *
+   * @param metalake The metalake name
+   * @param catalog The catalog name
+   * @param schema The schema name
+   * @param table The table name
+   * @param column The column name
+   * @return The created column {@link NameIdentifier}
+   */
+  public static NameIdentifier ofColumn(
+      String metalake, String catalog, String schema, String table, String 
column) {
+    return NameIdentifier.of(metalake, catalog, schema, table, column);
+  }
+
   /**
    * Create the fileset {@link NameIdentifier} with the given metalake, 
catalog, schema and fileset
    * name.
@@ -196,6 +212,17 @@ public class NameIdentifierUtil {
     NamespaceUtil.checkTable(ident.namespace());
   }
 
+  /**
+   * Check the given {@link NameIdentifier} is a column identifier. Throw an 
{@link
+   * IllegalNameIdentifierException} if it's not.
+   *
+   * @param ident The column {@link NameIdentifier} to check.
+   */
+  public static void checkColumn(NameIdentifier ident) {
+    NameIdentifier.check(ident != null, "Column identifier must not be null");
+    NamespaceUtil.checkColumn(ident.namespace());
+  }
+
   /**
    * Check the given {@link NameIdentifier} is a fileset identifier. Throw an 
{@link
    * IllegalNameIdentifierException} if it's not.
@@ -266,6 +293,12 @@ public class NameIdentifierUtil {
         String tableParent = dot.join(ident.namespace().level(1), 
ident.namespace().level(2));
         return MetadataObjects.of(tableParent, ident.name(), 
MetadataObject.Type.TABLE);
 
+      case COLUMN:
+        checkColumn(ident);
+        Namespace columnNs = ident.namespace();
+        String columnParent = dot.join(columnNs.level(1), columnNs.level(2), 
columnNs.level(3));
+        return MetadataObjects.of(columnParent, ident.name(), 
MetadataObject.Type.COLUMN);
+
       case FILESET:
         checkFileset(ident);
         String filesetParent = dot.join(ident.namespace().level(1), 
ident.namespace().level(2));
diff --git a/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
index 2c353b07b..c24015bb3 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
@@ -70,6 +70,19 @@ public class NamespaceUtil {
     return Namespace.of(metalake, catalog, schema);
   }
 
+  /**
+   * Create a namespace for column.
+   *
+   * @param metalake The metalake name
+   * @param catalog The catalog name
+   * @param schema The schema name
+   * @param table The table name
+   * @return A namespace for column
+   */
+  public static Namespace ofColumn(String metalake, String catalog, String 
schema, String table) {
+    return Namespace.of(metalake, catalog, schema, table);
+  }
+
   /**
    * Create a namespace for fileset.
    *
@@ -146,6 +159,19 @@ public class NamespaceUtil {
         namespace);
   }
 
+  /**
+   * Check if the given column namespace is legal, throw an {@link 
IllegalNamespaceException} if
+   * it's illegal.
+   *
+   * @param namespace The column namespace
+   */
+  public static void checkColumn(Namespace namespace) {
+    check(
+        namespace != null && namespace.length() == 4,
+        "Column namespace must be non-null and have 4 levels, the input 
namespace is %s",
+        namespace);
+  }
+
   /**
    * Check if the given fileset namespace is legal, throw an {@link 
IllegalNamespaceException} if
    * it's illegal.
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
index 8d61d357c..30eb6bda6 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
@@ -37,6 +37,7 @@ import org.apache.gravitino.rel.expressions.literals.Literals;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.storage.relational.po.ColumnPO;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.com.google.common.collect.Lists;
@@ -420,6 +421,115 @@ public class TestTableColumnMetaService extends 
TestJDBCBackend {
         () -> 
TableMetaService.getInstance().getTableByIdentifier(retrievedTable.nameIdentifier()));
   }
 
+  @Test
+  public void testGetColumnIdAndPO() throws IOException {
+    String catalogName = "catalog1";
+    String schemaName = "schema1";
+    createParentEntities(METALAKE_NAME, catalogName, schemaName);
+
+    // Create a table entity with column
+    ColumnEntity column =
+        ColumnEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("column1")
+            .withPosition(0)
+            .withComment("comment1")
+            .withDataType(Types.IntegerType.get())
+            .withNullable(true)
+            .withAutoIncrement(false)
+            .withDefaultValue(Literals.integerLiteral(1))
+            .withAuditInfo(auditInfo)
+            .build();
+
+    TableEntity createdTable =
+        TableEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName("table1")
+            .withNamespace(Namespace.of(METALAKE_NAME, catalogName, 
schemaName))
+            .withColumns(Lists.newArrayList(column))
+            .withAuditInfo(auditInfo)
+            .build();
+
+    TableMetaService.getInstance().insertTable(createdTable, false);
+
+    TableEntity retrievedTable =
+        
TableMetaService.getInstance().getTableByIdentifier(createdTable.nameIdentifier());
+    Assertions.assertEquals(1, retrievedTable.columns().size());
+    Assertions.assertEquals(column.id(), retrievedTable.columns().get(0).id());
+
+    Long columnId =
+        TableColumnMetaService.getInstance()
+            .getColumnIdByTableIdAndName(retrievedTable.id(), column.name());
+    Assertions.assertEquals(column.id(), columnId);
+
+    ColumnPO retrievedColumn = 
TableColumnMetaService.getInstance().getColumnPOById(column.id());
+    Assertions.assertEquals(column.id(), retrievedColumn.getColumnId());
+    Assertions.assertEquals(column.name(), retrievedColumn.getColumnName());
+    Assertions.assertEquals(column.position(), 
retrievedColumn.getColumnPosition());
+    Assertions.assertEquals(column.comment(), 
retrievedColumn.getColumnComment());
+    Assertions.assertEquals(
+        ColumnPO.ColumnOpType.CREATE.value(), 
retrievedColumn.getColumnOpType());
+
+    // Update the column name
+    ColumnEntity updatedColumn =
+        ColumnEntity.builder()
+            .withId(column.id())
+            .withName("column1_updated")
+            .withPosition(column.position())
+            .withComment(column.comment())
+            .withDataType(column.dataType())
+            .withNullable(column.nullable())
+            .withAutoIncrement(column.autoIncrement())
+            .withDefaultValue(column.defaultValue())
+            .withAuditInfo(auditInfo)
+            .build();
+
+    TableEntity updatedTable =
+        TableEntity.builder()
+            .withId(retrievedTable.id())
+            .withName(retrievedTable.name())
+            .withNamespace(retrievedTable.namespace())
+            .withColumns(Lists.newArrayList(updatedColumn))
+            .withAuditInfo(retrievedTable.auditInfo())
+            .build();
+
+    Function<TableEntity, TableEntity> updater = oldTable -> updatedTable;
+    
TableMetaService.getInstance().updateTable(retrievedTable.nameIdentifier(), 
updater);
+
+    Long updatedColumnId =
+        TableColumnMetaService.getInstance()
+            .getColumnIdByTableIdAndName(retrievedTable.id(), 
updatedColumn.name());
+    Assertions.assertEquals(updatedColumn.id(), updatedColumnId);
+
+    ColumnPO updatedColumnPO =
+        
TableColumnMetaService.getInstance().getColumnPOById(updatedColumn.id());
+    Assertions.assertEquals(updatedColumn.id(), updatedColumnPO.getColumnId());
+    Assertions.assertEquals(updatedColumn.name(), 
updatedColumnPO.getColumnName());
+
+    // Delete the column
+    TableEntity updatedTable2 =
+        TableEntity.builder()
+            .withId(retrievedTable.id())
+            .withName(retrievedTable.name())
+            .withNamespace(retrievedTable.namespace())
+            .withColumns(Lists.newArrayList())
+            .withAuditInfo(retrievedTable.auditInfo())
+            .build();
+
+    Function<TableEntity, TableEntity> updater2 = oldTable -> updatedTable2;
+    
TableMetaService.getInstance().updateTable(retrievedTable.nameIdentifier(), 
updater2);
+
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            TableColumnMetaService.getInstance()
+                .getColumnIdByTableIdAndName(retrievedTable.id(), 
updatedColumn.name()));
+
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () -> 
TableColumnMetaService.getInstance().getColumnPOById(updatedColumn.id()));
+  }
+
   private void compareTwoColumns(
       List<ColumnEntity> expectedColumns, List<ColumnEntity> actualColumns) {
     Assertions.assertEquals(expectedColumns.size(), actualColumns.size());
diff --git a/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java 
b/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
index 82ed55eed..27b4fa84b 100644
--- a/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
+++ b/core/src/test/java/org/apache/gravitino/tag/TestTagManager.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
@@ -67,10 +68,12 @@ import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.metalake.MetalakeDispatcher;
+import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.utils.NameIdentifierUtil;
@@ -96,6 +99,9 @@ public class TestTagManager {
   private static final String SCHEMA = "schema_for_tag_test";
 
   private static final String TABLE = "table_for_tag_test";
+
+  private static final String COLUMN = "column_for_tag_test";
+
   private static final MetalakeDispatcher metalakeDispatcher = 
mock(MetalakeDispatcher.class);
   private static final CatalogDispatcher catalogDispatcher = 
mock(CatalogDispatcher.class);
   private static final SchemaDispatcher schemaDispatcher = 
mock(SchemaDispatcher.class);
@@ -166,10 +172,23 @@ public class TestTagManager {
             .build();
     entityStore.put(schema, false /* overwritten */);
 
+    ColumnEntity column =
+        ColumnEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(COLUMN)
+            .withPosition(0)
+            .withComment("Test column")
+            .withDataType(Types.IntegerType.get())
+            .withNullable(true)
+            .withAutoIncrement(false)
+            .withAuditInfo(audit)
+            .build();
+
     TableEntity table =
         TableEntity.builder()
             .withId(idGenerator.nextId())
             .withName(TABLE)
+            .withColumns(Lists.newArrayList(column))
             .withNamespace(Namespace.of(METALAKE, CATALOG, SCHEMA))
             .withAuditInfo(audit)
             .build();
@@ -219,6 +238,13 @@ public class TestTagManager {
     String[] tableTags = tagManager.listTagsForMetadataObject(METALAKE, 
tableObject);
     tagManager.associateTagsForMetadataObject(METALAKE, tableObject, null, 
tableTags);
 
+    MetadataObject columnObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofColumn(METALAKE, CATALOG, SCHEMA, TABLE, 
COLUMN),
+            Entity.EntityType.COLUMN);
+    String[] columnTags = tagManager.listTagsForMetadataObject(METALAKE, 
columnObject);
+    tagManager.associateTagsForMetadataObject(METALAKE, columnObject, null, 
columnTags);
+
     Arrays.stream(tagManager.listTags(METALAKE)).forEach(n -> 
tagManager.deleteTag(METALAKE, n));
   }
 
@@ -439,6 +465,37 @@ public class TestTagManager {
 
     Assertions.assertEquals(2, tags6.length);
     Assertions.assertEquals(ImmutableSet.of("tag1", "tag3"), 
ImmutableSet.copyOf(tags6));
+
+    // Test associate and disassociate same tags for column
+    MetadataObject columnObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofColumn(METALAKE, CATALOG, SCHEMA, TABLE, 
COLUMN),
+            Entity.EntityType.COLUMN);
+
+    String[] tagsToAdd3 = new String[] {tag1.name()};
+    String[] tags7 =
+        tagManager.associateTagsForMetadataObject(METALAKE, columnObject, 
tagsToAdd3, null);
+
+    Assertions.assertEquals(1, tags7.length);
+    Assertions.assertEquals(ImmutableSet.of("tag1"), 
ImmutableSet.copyOf(tags7));
+
+    // Test associate and disassociate tags for column
+    String[] tagsToRemove2 = new String[] {tag1.name()};
+    String[] tags8 =
+        tagManager.associateTagsForMetadataObject(METALAKE, columnObject, 
null, tagsToRemove2);
+
+    Assertions.assertEquals(0, tags8.length);
+    Assertions.assertEquals(ImmutableSet.of(), ImmutableSet.copyOf(tags8));
+
+    // Test associate and disassociate same tags for column
+    String[] tagsToAdd4 = new String[] {tag2.name(), tag3.name()};
+    String[] tagsToRemove3 = new String[] {tag2.name()};
+    String[] tags9 =
+        tagManager.associateTagsForMetadataObject(
+            METALAKE, columnObject, tagsToAdd4, tagsToRemove3);
+
+    Assertions.assertEquals(1, tags9.length);
+    Assertions.assertEquals(ImmutableSet.of("tag3"), 
ImmutableSet.copyOf(tags9));
   }
 
   @Test
@@ -456,6 +513,10 @@ public class TestTagManager {
     MetadataObject tableObject =
         NameIdentifierUtil.toMetadataObject(
             NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+    MetadataObject columnObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofColumn(METALAKE, CATALOG, SCHEMA, TABLE, 
COLUMN),
+            Entity.EntityType.COLUMN);
 
     tagManager.associateTagsForMetadataObject(
         METALAKE, catalogObject, new String[] {tag1.name(), tag2.name(), 
tag3.name()}, null);
@@ -463,11 +524,14 @@ public class TestTagManager {
         METALAKE, schemaObject, new String[] {tag1.name(), tag2.name()}, null);
     tagManager.associateTagsForMetadataObject(
         METALAKE, tableObject, new String[] {tag1.name()}, null);
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, columnObject, new String[] {tag1.name()}, null);
 
     MetadataObject[] objects = tagManager.listMetadataObjectsForTag(METALAKE, 
tag1.name());
-    Assertions.assertEquals(3, objects.length);
+    Assertions.assertEquals(4, objects.length);
     Assertions.assertEquals(
-        ImmutableSet.of(catalogObject, schemaObject, tableObject), 
ImmutableSet.copyOf(objects));
+        ImmutableSet.of(catalogObject, schemaObject, tableObject, 
columnObject),
+        ImmutableSet.copyOf(objects));
 
     MetadataObject[] objects1 = tagManager.listMetadataObjectsForTag(METALAKE, 
tag2.name());
     Assertions.assertEquals(2, objects1.length);
@@ -504,6 +568,10 @@ public class TestTagManager {
     MetadataObject tableObject =
         NameIdentifierUtil.toMetadataObject(
             NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+    MetadataObject columnObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofColumn(METALAKE, CATALOG, SCHEMA, TABLE, 
COLUMN),
+            Entity.EntityType.COLUMN);
 
     tagManager.associateTagsForMetadataObject(
         METALAKE, catalogObject, new String[] {tag1.name(), tag2.name(), 
tag3.name()}, null);
@@ -511,6 +579,8 @@ public class TestTagManager {
         METALAKE, schemaObject, new String[] {tag1.name(), tag2.name()}, null);
     tagManager.associateTagsForMetadataObject(
         METALAKE, tableObject, new String[] {tag1.name()}, null);
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, columnObject, new String[] {tag1.name()}, null);
 
     String[] tags = tagManager.listTagsForMetadataObject(METALAKE, 
catalogObject);
     Assertions.assertEquals(3, tags.length);
@@ -536,6 +606,14 @@ public class TestTagManager {
     Assertions.assertEquals(1, tagsInfo2.length);
     Assertions.assertEquals(ImmutableSet.of(tag1), 
ImmutableSet.copyOf(tagsInfo2));
 
+    String[] tags3 = tagManager.listTagsForMetadataObject(METALAKE, 
columnObject);
+    Assertions.assertEquals(1, tags3.length);
+    Assertions.assertEquals(ImmutableSet.of("tag1"), 
ImmutableSet.copyOf(tags3));
+
+    Tag[] tagsInfo3 = tagManager.listTagsInfoForMetadataObject(METALAKE, 
columnObject);
+    Assertions.assertEquals(1, tagsInfo3.length);
+    Assertions.assertEquals(ImmutableSet.of(tag1), 
ImmutableSet.copyOf(tagsInfo3));
+
     // List tags for non-existent metadata object
     MetadataObject nonExistentObject =
         NameIdentifierUtil.toMetadataObject(
@@ -564,6 +642,10 @@ public class TestTagManager {
     MetadataObject tableObject =
         NameIdentifierUtil.toMetadataObject(
             NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+    MetadataObject columnObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofColumn(METALAKE, CATALOG, SCHEMA, TABLE, 
COLUMN),
+            Entity.EntityType.COLUMN);
 
     tagManager.associateTagsForMetadataObject(
         METALAKE, catalogObject, new String[] {tag1.name(), tag2.name(), 
tag3.name()}, null);
@@ -571,6 +653,8 @@ public class TestTagManager {
         METALAKE, schemaObject, new String[] {tag1.name(), tag2.name()}, null);
     tagManager.associateTagsForMetadataObject(
         METALAKE, tableObject, new String[] {tag1.name()}, null);
+    tagManager.associateTagsForMetadataObject(
+        METALAKE, columnObject, new String[] {tag1.name()}, null);
 
     Tag result = tagManager.getTagForMetadataObject(METALAKE, catalogObject, 
tag1.name());
     Assertions.assertEquals(tag1, result);
@@ -584,6 +668,9 @@ public class TestTagManager {
     Tag result3 = tagManager.getTagForMetadataObject(METALAKE, catalogObject, 
tag3.name());
     Assertions.assertEquals(tag3, result3);
 
+    Tag result4 = tagManager.getTagForMetadataObject(METALAKE, tableObject, 
tag1.name());
+    Assertions.assertEquals(tag1, result4);
+
     // Test get non-existent tag for metadata object
     Throwable e =
         Assertions.assertThrows(
diff --git 
a/core/src/test/java/org/apache/gravitino/utils/TestMetadataObjectUtil.java 
b/core/src/test/java/org/apache/gravitino/utils/TestMetadataObjectUtil.java
index 1de30d16f..c5a281866 100644
--- a/core/src/test/java/org/apache/gravitino/utils/TestMetadataObjectUtil.java
+++ b/core/src/test/java/org/apache/gravitino/utils/TestMetadataObjectUtil.java
@@ -113,12 +113,10 @@ public class TestMetadataObjectUtil {
             "metalake",
             MetadataObjects.of("catalog.schema", "fileset", 
MetadataObject.Type.FILESET)));
 
-    Assertions.assertThrows(
-        IllegalArgumentException.class,
-        () ->
-            MetadataObjectUtil.toEntityIdent(
-                "metalake",
-                MetadataObjects.of("catalog.schema.table", "column", 
MetadataObject.Type.COLUMN)),
-        "Cannot convert column metadata object to entity identifier: 
catalog.schema.table.column");
+    Assertions.assertEquals(
+        NameIdentifier.of("metalake", "catalog", "schema", "table", "column"),
+        MetadataObjectUtil.toEntityIdent(
+            "metalake",
+            MetadataObjects.of("catalog.schema.table", "column", 
MetadataObject.Type.COLUMN)));
   }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java 
b/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
index 964f910ba..2eca30351 100644
--- a/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
+++ b/core/src/test/java/org/apache/gravitino/utils/TestNameIdentifierUtil.java
@@ -104,12 +104,12 @@ public class TestNameIdentifierUtil {
     assertEquals(
         filesetObject, NameIdentifierUtil.toMetadataObject(fileset, 
Entity.EntityType.FILESET));
 
-    // test column
-    Throwable e =
-        assertThrows(
-            IllegalArgumentException.class,
-            () -> NameIdentifierUtil.toMetadataObject(fileset, 
Entity.EntityType.COLUMN));
-    assertTrue(e.getMessage().contains("Entity type COLUMN is not supported"));
+    NameIdentifier column =
+        NameIdentifier.of("metalake1", "catalog1", "schema1", "table1", 
"column1");
+    MetadataObject columnObject =
+        MetadataObjects.parse("catalog1.schema1.table1.column1", 
MetadataObject.Type.COLUMN);
+    assertEquals(
+        columnObject, NameIdentifierUtil.toMetadataObject(column, 
Entity.EntityType.COLUMN));
 
     // test null
     Throwable e1 =
diff --git a/docs/manage-tags-in-gravitino.md b/docs/manage-tags-in-gravitino.md
index ac088a7c2..4163ca89d 100644
--- a/docs/manage-tags-in-gravitino.md
+++ b/docs/manage-tags-in-gravitino.md
@@ -26,9 +26,9 @@ the future versions.
    `COLUMN`, `FILESET`, `TOPIC`, `COLUMN`, etc. A metadata object is combined 
by a `type` and a
    comma-separated `name`. For example, a `CATAGLOG` object has a name 
"catalog1" with type
    "CATALOG", a `SCHEMA` object has a name "catalog1.schema1" with type 
"SCHEMA", a `TABLE`
-   object has a name "catalog1.schema1.table1" with type "TABLE".
-2. Currently, only `CATALOG`, `SCHEMA`, `TABLE`, `FILESET`, `TOPIC` objects 
can be tagged, tagging
-   on `COLUMN` will be supported in the future.
+   object has a name "catalog1.schema1.table1" with type "TABLE", a `COLUMN` 
object has a name 
+   "catalog1.schema1.table1.column1" with type "COLUMN".
+2. Currently, `CATALOG`, `SCHEMA`, `TABLE`, `FILESET`, `TOPIC`, and `COLUMN` 
objects can be tagged.
 3. Tags in Gravitino is inheritable, so listing tags of a metadata object will 
also list the
    tags of its parent metadata objects. For example, listing tags of a `Table` 
will also list
    the tags of its parent `Schema` and `Catalog`.
diff --git a/docs/open-api/openapi.yaml b/docs/open-api/openapi.yaml
index 0b16270c1..24bc0f2ce 100644
--- a/docs/open-api/openapi.yaml
+++ b/docs/open-api/openapi.yaml
@@ -453,6 +453,7 @@ components:
           - "CATALOG"
           - "SCHEMA"
           - "TABLE"
+          - "COLUMN"
           - "FILESET"
           - "TOPIC"
           - "ROLE"
diff --git a/docs/open-api/tags.yaml b/docs/open-api/tags.yaml
index 42d45c2a1..7b8deef25 100644
--- a/docs/open-api/tags.yaml
+++ b/docs/open-api/tags.yaml
@@ -213,6 +213,7 @@ paths:
       tags:
         - tag
       summary: Associate tags with metadata object
+      description: Associate and disassociate tags with metadata object, 
please be aware that supported metadata objects are CATALOG, SCHEMA, TABLE, 
FILESET, TOPIC, COLUMN
       operationId: associateTags
       requestBody:
         content:

Reply via email to