This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-lance-namespace-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-lance-namespace-dev by 
this push:
     new c847354a25 [#9022] improve(core): Refactor to add the managed table 
operations (#9073)
c847354a25 is described below

commit c847354a257d928150e3bff58f5282b52dd01da6
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Nov 17 09:26:17 2025 -0800

    [#9022] improve(core): Refactor to add the managed table operations (#9073)
    
    ### What changes were proposed in this pull request?
    
    This PR adds a new managed table operations class for managed table CRUD
    in Gravitino.
    
    ### Why are the changes needed?
    
    This is the prerequisite for refactoring the generic lakehouse catalog.
    
    Fix: #9022
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add UTs to cover the code.
---
 .../GenericLakehouseCatalogOperations.java         |   7 -
 .../lakehouse/lance/LanceCatalogOperations.java    |   2 -
 .../gravitino/catalog/ManagedSchemaOperations.java |   2 +-
 .../gravitino/catalog/ManagedTableOperations.java  | 538 +++++++++++++++
 .../apache/gravitino/connector/GenericColumn.java  |  61 ++
 .../gravitino/connector/GenericLakehouseTable.java |  15 +-
 .../apache/gravitino/connector/GenericTable.java   |  71 ++
 .../org/apache/gravitino/meta/TableEntity.java     |  12 -
 .../storage/relational/utils/POConverters.java     |  27 +-
 .../catalog/TestManagedTableOperations.java        | 755 +++++++++++++++++++++
 .../java/org/apache/gravitino/meta/TestEntity.java |   3 -
 .../gravitino/storage/TestEntityStorage.java       |   2 -
 .../storage/relational/TestJDBCBackend.java        |   5 +-
 13 files changed, 1457 insertions(+), 43 deletions(-)

diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 6428fb816e..b546313ec5 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -200,7 +200,6 @@ public class GenericLakehouseCatalogOperations
     try {
       TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
       return GenericLakehouseTable.builder()
-          .withFormat(tableEntity.format())
           .withProperties(tableEntity.properties())
           .withAuditInfo(tableEntity.auditInfo())
           .withSortOrders(tableEntity.sortOrders())
@@ -229,11 +228,6 @@ public class GenericLakehouseCatalogOperations
       SortOrder[] sortOrders,
       Index[] indexes)
       throws NoSuchSchemaException, TableAlreadyExistsException {
-    LakehouseTableFormat format =
-        (LakehouseTableFormat)
-            propertiesMetadata
-                .tablePropertiesMetadata()
-                .getOrDefault(properties, 
GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_FORMAT);
     Schema schema = loadSchema(NameIdentifier.of(ident.namespace().levels()));
 
     String tableLocation = calculateTableLocation(schema, ident, properties);
@@ -261,7 +255,6 @@ public class GenericLakehouseCatalogOperations
               .withName(ident.name())
               .withNamespace(ident.namespace())
               .withColumns(columnEntityList)
-              .withFormat(format.lowerName())
               .withProperties(newProperties)
               .withComment(comment)
               .withPartitioning(partitions)
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
index 8ec7909b2b..a382b3170c 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -190,7 +190,6 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
                       .withId(tableEntity.id())
                       .withName(tableEntity.name())
                       .withNamespace(tableEntity.namespace())
-                      .withFormat(entity.format())
                       .withAuditInfo(
                           AuditInfo.builder()
                               .withCreator(tableEntity.auditInfo().creator())
@@ -213,7 +212,6 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
 
       // return the updated table
       return GenericLakehouseTable.builder()
-          .withFormat(updatedEntity.format())
           .withProperties(updatedEntity.properties())
           .withAuditInfo(updatedEntity.auditInfo())
           .withSortOrders(updatedEntity.sortOrders())
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/ManagedSchemaOperations.java 
b/core/src/main/java/org/apache/gravitino/catalog/ManagedSchemaOperations.java
index fec07bacea..164d4b5346 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/ManagedSchemaOperations.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/ManagedSchemaOperations.java
@@ -100,7 +100,7 @@ public abstract class ManagedSchemaOperations implements 
SupportsSchemas {
     }
 
     StringIdentifier stringId = StringIdentifier.fromProperties(properties);
-    Preconditions.checkNotNull(stringId, "Property String identifier should 
not be null");
+    Preconditions.checkArgument(stringId != null, "Property String identifier 
should not be null");
 
     SchemaEntity schemaEntity =
         SchemaEntity.builder()
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/ManagedTableOperations.java 
b/core/src/main/java/org/apache/gravitino/catalog/ManagedTableOperations.java
new file mode 100644
index 0000000000..44060c6cae
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/ManagedTableOperations.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.connector.GenericColumn;
+import org.apache.gravitino.connector.GenericTable;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+public abstract class ManagedTableOperations implements TableCatalog {
+
+  private static final Joiner DOT = Joiner.on(".");
+
+  protected abstract EntityStore store();
+
+  protected abstract SupportsSchemas schemas();
+
+  protected abstract IdGenerator idGenerator();
+
+  @Override
+  public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
+    try {
+      // The current implementation of JDBC entity store will automatically 
check the existence of
+      // the namespace when listing entities under the namespace. If not, it 
will throw an
+      // NoSuchEntityException. So, we don't need to check the existence of 
the namespace here
+      // again.
+      List<TableEntity> tables =
+          store().list(namespace, TableEntity.class, Entity.EntityType.TABLE);
+      return tables.stream()
+          .map(t -> NameIdentifier.of(namespace, t.name()))
+          .toArray(NameIdentifier[]::new);
+
+    } catch (NoSuchEntityException e) {
+      throw new NoSuchSchemaException(e, "Schema %s does not exist", 
namespace);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to list tables in schema " + 
namespace, e);
+    }
+  }
+
+  @Override
+  public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+    try {
+      TableEntity tableEntity = store().get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
+      return toGenericTable(tableEntity);
+
+    } catch (NoSuchEntityException e) {
+      throw new NoSuchTableException(e, "Table %s does not exist", ident);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to load table " + ident, e);
+    }
+  }
+
+  @Override
+  public Table createTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes)
+      throws NoSuchSchemaException, TableAlreadyExistsException {
+    // createTable in ManagedTableOperations only stores the table metadata in 
the entity store.
+    // It doesn't handle any additional operations like creating physical 
location, preprocessing
+    // the properties, etc. Those operations should be handled in the specific 
catalog
+    // implementation.
+    StringIdentifier stringId = StringIdentifier.fromProperties(properties);
+    Preconditions.checkArgument(stringId != null, "Property String identifier 
should not be null");
+
+    AuditInfo auditInfo =
+        AuditInfo.builder()
+            .withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+            .withCreateTime(Instant.now())
+            .build();
+
+    TableEntity tableEntity =
+        TableEntity.builder()
+            .withName(ident.name())
+            .withId(stringId.id())
+            .withNamespace(ident.namespace())
+            .withComment(comment)
+            .withColumns(toColumnEntities(columns, auditInfo, idGenerator()))
+            .withProperties(properties)
+            .withPartitioning(partitions)
+            .withDistribution(distribution)
+            .withSortOrders(sortOrders)
+            .withIndexes(indexes)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    try {
+      store().put(tableEntity, false /* overwrite */);
+    } catch (NoSuchEntityException e) {
+      // The put operation in the current JDBC entity store will check the 
existence of the
+      // namespace when creating an entity under the namespace. If not, it 
will throw a
+      // NoSuchEntityException. So, we don't need to check the existence of 
the namespace here
+      // again.
+      throw new NoSuchSchemaException(e, "Schema %s does not exist", 
ident.namespace());
+    } catch (EntityAlreadyExistsException e) {
+      throw new TableAlreadyExistsException(e, "Table %s already exists", 
ident);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to create table " + ident, e);
+    }
+
+    return toGenericTable(tableEntity);
+  }
+
+  @Override
+  public Table alterTable(NameIdentifier ident, TableChange... changes)
+      throws NoSuchTableException, IllegalArgumentException {
+    // The alterTable in ManagedTableOperations only updates the table 
metadata in the entity store.
+    // It doesn't handle any additional operations like modifying physical 
data, etc. Those
+    // operations should be handled in the specific catalog implementation.
+    try {
+      TableEntity newTableEntity =
+          store()
+              .update(
+                  ident,
+                  TableEntity.class,
+                  Entity.EntityType.TABLE,
+                  oldEntity -> applyChanges(oldEntity, changes));
+
+      return toGenericTable(newTableEntity);
+    } catch (NoSuchEntityException e) {
+      throw new NoSuchTableException(e, "Table %s does not exist", ident);
+    } catch (EntityAlreadyExistsException e) {
+      throw new IllegalArgumentException(
+          "Failed to rename table " + ident + " due to table already exists: 
", e);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to alter table " + ident, e);
+    }
+  }
+
+  @Override
+  public boolean purgeTable(NameIdentifier ident) {
+    // For Gravitino managed tables, purgeTable is equivalent to dropTable. It 
only removes the
+    // table metadata from the entity store. Physical data deletion should be 
handled by the
+    // specific catalog implementation if needed.
+    return dropTable(ident);
+  }
+
+  @Override
+  public boolean dropTable(NameIdentifier ident) {
+    try {
+      return store().delete(ident, Entity.EntityType.TABLE);
+    } catch (NoSuchEntityException e) {
+      return false;
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to drop metadata for table " + ident, 
e);
+    }
+  }
+
+  private TableEntity applyChanges(TableEntity oldTableEntity, TableChange... 
changes) {
+    String newName = oldTableEntity.name();
+    String newComment = oldTableEntity.comment();
+    Map<String, String> newProps = 
Maps.newHashMap(oldTableEntity.properties());
+    List<ColumnEntity> newColumns = 
Lists.newArrayList(oldTableEntity.columns());
+    List<Index> newIndexes = Lists.newArrayList(oldTableEntity.indexes());
+
+    Map<Boolean, List<TableChange>> splitChanges =
+        Arrays.stream(changes)
+            .collect(
+                Collectors.partitioningBy(change -> change instanceof 
TableChange.ColumnChange));
+    List<TableChange.ColumnChange> columnChanges =
+        splitChanges.get(true).stream()
+            .map(change -> (TableChange.ColumnChange) change)
+            .collect(Collectors.toList());
+    List<TableChange> tableChanges = splitChanges.get(false);
+
+    for (TableChange change : tableChanges) {
+      if (change instanceof TableChange.RenameTable rename) {
+        if (rename.getNewSchemaName().isPresent()) {
+          throw new IllegalArgumentException(
+              "Gravitino managed table doesn't support renaming "
+                  + "the table across schemas for now");
+        }
+
+        newName = rename.getNewName();
+
+      } else if (change instanceof TableChange.UpdateComment updateComment) {
+        newComment = updateComment.getNewComment();
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        newProps.put(setProperty.getProperty(), setProperty.getValue());
+
+      } else if (change instanceof TableChange.RemoveProperty removeProperty) {
+        newProps.remove(removeProperty.getProperty());
+
+      } else if (change instanceof TableChange.AddIndex addIndex) {
+        Index newIndex =
+            Indexes.IndexImpl.builder()
+                .withName(addIndex.getName())
+                .withFieldNames(addIndex.getFieldNames())
+                .withIndexType(addIndex.getType())
+                .build();
+        newIndexes.add(newIndex);
+
+      } else if (change instanceof TableChange.DeleteIndex deleteIndex) {
+        boolean removed = newIndexes.removeIf(idx -> 
idx.name().equals(deleteIndex.getName()));
+        if (!removed && !deleteIndex.isIfExists()) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Index %s does not exist while ifExists is false", 
deleteIndex.getName()));
+        }
+
+      } else {
+        throw new IllegalArgumentException("Unsupported table change: " + 
change);
+      }
+    }
+
+    newColumns = applyColumnChanges(newColumns, columnChanges);
+
+    return TableEntity.builder()
+        .withId(oldTableEntity.id())
+        .withName(newName)
+        .withNamespace(oldTableEntity.namespace())
+        .withComment(newComment)
+        .withColumns(newColumns)
+        .withProperties(newProps)
+        .withPartitioning(oldTableEntity.partitioning())
+        .withDistribution(oldTableEntity.distribution())
+        .withSortOrders(oldTableEntity.sortOrders())
+        .withIndexes(newIndexes.toArray(Index[]::new))
+        .withAuditInfo(
+            AuditInfo.builder()
+                .withCreator(oldTableEntity.auditInfo().creator())
+                .withCreateTime(oldTableEntity.auditInfo().createTime())
+                
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                .withLastModifiedTime(Instant.now())
+                .build())
+        .build();
+  }
+
+  private List<ColumnEntity> applyColumnChanges(
+      List<ColumnEntity> oldColumns, List<TableChange.ColumnChange> 
columnChanges) {
+    // sort the column by position first, columns maybe unordered when 
retrieved from the store.
+    List<ColumnEntity> newColumns =
+        Lists.newArrayList(oldColumns).stream()
+            .sorted(Comparator.comparingInt(ColumnEntity::position))
+            .collect(Collectors.toList());
+
+    for (TableChange.ColumnChange change : columnChanges) {
+      if (change instanceof TableChange.AddColumn addColumn) {
+        String columnName = DOT.join(addColumn.getFieldName());
+        boolean exists = newColumns.stream().anyMatch(col -> 
col.name().equals(columnName));
+        if (exists) {
+          throw new IllegalArgumentException(String.format("Column %s already 
exists", columnName));
+        }
+        // Note. The default behavior of addColumn is to add the column at the 
end.
+        int position = calculateColumnPosition(newColumns, 
addColumn.getPosition(), true);
+
+        ColumnEntity columnToAdd =
+            ColumnEntity.builder()
+                .withId(idGenerator().nextId())
+                .withName(DOT.join(addColumn.getFieldName()))
+                .withDataType(addColumn.getDataType())
+                .withPosition(position)
+                .withComment(addColumn.getComment())
+                .withNullable(addColumn.isNullable())
+                .withAutoIncrement(addColumn.isAutoIncrement())
+                .withDefaultValue(addColumn.getDefaultValue())
+                .withAuditInfo(
+                    AuditInfo.builder()
+                        
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                        .withCreateTime(Instant.now())
+                        .build())
+                .build();
+
+        // Add the new column at the specified position
+        newColumns.add(position, columnToAdd);
+
+      } else if (change instanceof TableChange.RenameColumn
+          || change instanceof TableChange.UpdateColumnDefaultValue
+          || change instanceof TableChange.UpdateColumnType
+          || change instanceof TableChange.UpdateColumnComment
+          || change instanceof TableChange.UpdateColumnPosition
+          || change instanceof TableChange.UpdateColumnNullability
+          || change instanceof TableChange.UpdateColumnAutoIncrement) {
+        int i;
+        ColumnEntity oldColumn = null;
+        for (i = 0; i < newColumns.size(); i++) {
+          ColumnEntity col = newColumns.get(i);
+          if (col.name().equals(DOT.join(change.fieldName()))) {
+            oldColumn = col;
+            break;
+          }
+        }
+        if (oldColumn == null) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Column %s not found for %s",
+                  DOT.join(change.fieldName()), 
change.getClass().getSimpleName()));
+        }
+
+        // Remove the old column temporarily, we will insert it back after 
updating.
+        newColumns.remove(oldColumn);
+
+        Optional<String> newName = Optional.empty();
+        if (change instanceof TableChange.RenameColumn rename) {
+          boolean columnExists =
+              newColumns.stream().anyMatch(col -> 
col.name().equals(rename.getNewName()));
+          if (columnExists) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Column %s already exists when renaming column %s",
+                    rename.getNewName(), DOT.join(change.fieldName())));
+          }
+
+          newName = Optional.of(rename.getNewName());
+        }
+
+        Optional<Expression> newDefaultValue =
+            change instanceof TableChange.UpdateColumnDefaultValue 
updateDefault
+                ? Optional.of(updateDefault.getNewDefaultValue())
+                : Optional.empty();
+        Optional<Type> newDataType =
+            change instanceof TableChange.UpdateColumnType updateType
+                ? Optional.of(updateType.getNewDataType())
+                : Optional.empty();
+        Optional<String> newComment =
+            change instanceof TableChange.UpdateColumnComment updateComment
+                ? Optional.of(updateComment.getNewComment())
+                : Optional.empty();
+        Optional<Boolean> newNullable =
+            change instanceof TableChange.UpdateColumnNullability 
updateNullability
+                ? Optional.of(updateNullability.nullable())
+                : Optional.empty();
+        Optional<Boolean> newAutoIncrement =
+            change instanceof TableChange.UpdateColumnAutoIncrement 
updateAutoIncrement
+                ? Optional.of(updateAutoIncrement.isAutoIncrement())
+                : Optional.empty();
+
+        Optional<Integer> newPosition = Optional.empty();
+        if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+          newPosition =
+              Optional.of(
+                  calculateColumnPosition(newColumns, 
updateColumnPosition.getPosition(), false));
+        }
+
+        // add back the updated column
+        ColumnEntity newColumn =
+            updateColumnEntity(
+                oldColumn,
+                newName,
+                newDefaultValue,
+                newDataType,
+                newComment,
+                newPosition,
+                newNullable,
+                newAutoIncrement);
+        newColumns.add(newColumn.position(), newColumn);
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        boolean removed =
+            newColumns.removeIf(col -> 
col.name().equals(DOT.join(deleteColumn.fieldName())));
+        if (!removed && !deleteColumn.getIfExists()) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Column %s not found for deletion while ifExists is false",
+                  DOT.join(deleteColumn.fieldName())));
+        }
+
+      } else {
+        throw new IllegalArgumentException("Unsupported column change: " + 
change);
+      }
+    }
+
+    // After the column adding, updating, deleting, the positions of the 
columns may be messed up.
+    // We need to reassign the positions to ensure they are continuous and 
start from 0.
+    return updateColumnPositions(newColumns);
+  }
+
+  private ColumnEntity updateColumnEntity(
+      ColumnEntity oldColumn,
+      Optional<String> newName,
+      Optional<Expression> newDefaultValue,
+      Optional<Type> newDataType,
+      Optional<String> newComment,
+      Optional<Integer> newPosition,
+      Optional<Boolean> newNullable,
+      Optional<Boolean> newAutoIncrement) {
+    return ColumnEntity.builder()
+        .withId(oldColumn.id())
+        .withName(newName.orElse(oldColumn.name()))
+        .withDataType(newDataType.orElse(oldColumn.dataType()))
+        .withPosition(newPosition.orElse(oldColumn.position()))
+        .withComment(newComment.orElse(oldColumn.comment()))
+        .withNullable(newNullable.orElse(oldColumn.nullable()))
+        .withAutoIncrement(newAutoIncrement.orElse(oldColumn.autoIncrement()))
+        .withDefaultValue(newDefaultValue.orElse(oldColumn.defaultValue()))
+        .withAuditInfo(
+            AuditInfo.builder()
+                .withCreator(oldColumn.auditInfo().creator())
+                .withCreateTime(oldColumn.auditInfo().createTime())
+                
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                .withLastModifiedTime(Instant.now())
+                .build())
+        .build();
+  }
+
+  private GenericColumn toGenericColumn(ColumnEntity columnEntity) {
+    return GenericColumn.builder()
+        .withName(columnEntity.name())
+        .withComment(columnEntity.comment())
+        .withAutoIncrement(columnEntity.autoIncrement())
+        .withNullable(columnEntity.nullable())
+        .withType(columnEntity.dataType())
+        .withDefaultValue(columnEntity.defaultValue())
+        .build();
+  }
+
+  private GenericTable toGenericTable(TableEntity tableEntity) {
+    return GenericTable.builder()
+        .withName(tableEntity.name())
+        .withComment(tableEntity.comment())
+        .withColumns(
+            
tableEntity.columns().stream().map(this::toGenericColumn).toArray(Column[]::new))
+        .withProperties(tableEntity.properties())
+        .withAuditInfo(tableEntity.auditInfo())
+        .withSortOrders(tableEntity.sortOrders())
+        .withPartitioning(tableEntity.partitioning())
+        .withDistribution(tableEntity.distribution())
+        .withIndexes(tableEntity.indexes())
+        .build();
+  }
+
+  private List<ColumnEntity> toColumnEntities(
+      Column[] columns, AuditInfo audit, IdGenerator idGenerator) {
+    return columns == null
+        ? Collections.emptyList()
+        : IntStream.range(0, columns.length)
+            .mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i, 
idGenerator.nextId(), audit))
+            .collect(Collectors.toList());
+  }
+
+  private List<ColumnEntity> updateColumnPositions(List<ColumnEntity> columns) 
{
+    List<ColumnEntity> updatedColumns = Lists.newArrayList();
+    for (int i = 0; i < columns.size(); i++) {
+      ColumnEntity oldColumn = columns.get(i);
+      if (oldColumn.position() != i) {
+        ColumnEntity newColumn =
+            ColumnEntity.builder()
+                .withId(oldColumn.id())
+                .withName(oldColumn.name())
+                .withDataType(oldColumn.dataType())
+                .withPosition(i)
+                .withComment(oldColumn.comment())
+                .withNullable(oldColumn.nullable())
+                .withAutoIncrement(oldColumn.autoIncrement())
+                .withDefaultValue(oldColumn.defaultValue())
+                .withAuditInfo((AuditInfo) oldColumn.auditInfo())
+                .build();
+        updatedColumns.add(newColumn);
+      } else {
+        updatedColumns.add(oldColumn);
+      }
+    }
+    return updatedColumns;
+  }
+
+  int calculateColumnPosition(
+      List<ColumnEntity> existingColumns, TableChange.ColumnPosition position, 
boolean forAdd) {
+    if (position == TableChange.ColumnPosition.first()) {
+      return 0;
+    } else if (position instanceof TableChange.After afterColumn) {
+      for (int i = 0; i < existingColumns.size(); i++) {
+        if (existingColumns.get(i).name().equals(afterColumn.getColumn())) {
+          return i + 1;
+        }
+      }
+      throw new IllegalArgumentException(
+          String.format("Column %s not found for adding column after it", 
afterColumn.getColumn()));
+    } else if (forAdd && position == TableChange.ColumnPosition.defaultPos()) {
+      // Default position of Gravitino managed table is to add the column at 
the end.
+      // For add column operation only. Change column operation with default 
position is not
+      // supported.
+      return existingColumns.size();
+    } else {
+      throw new IllegalArgumentException("Unsupported column position: " + 
position);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/GenericColumn.java 
b/core/src/main/java/org/apache/gravitino/connector/GenericColumn.java
new file mode 100644
index 0000000000..7cf435c81d
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/connector/GenericColumn.java
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.connector;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/**
+ * A generic implementation of a table column. This is used to represent 
column metadata that is
+ * managed by Gravitino.
+ */
+@DeveloperApi
+public class GenericColumn extends BaseColumn {
+
+  /**
+   * Creates a new builder for constructing a GenericColumn instance.
+   *
+   * @return A new Builder instance.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /** A builder class for constructing GenericColumn instances. */
+  public static class Builder extends BaseColumnBuilder<Builder, 
GenericColumn> {
+
+    /**
+     * Internal method to build a GenericColumn instance using the provided 
values.
+     *
+     * @return A new GenericColumn instance with the configured values.
+     */
+    @Override
+    protected GenericColumn internalBuild() {
+      GenericColumn column = new GenericColumn();
+
+      column.name = name;
+      column.comment = comment;
+      column.dataType = dataType;
+      column.nullable = nullable;
+      column.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET : 
defaultValue;
+      column.autoIncrement = autoIncrement;
+      return column;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
index c408c49cc0..2d14a539e2 100644
--- 
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
+++ 
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
@@ -47,11 +47,16 @@ public class GenericLakehouseTable extends BaseTable {
       GenericLakehouseTable genericLakehouseTable = new 
GenericLakehouseTable();
       genericLakehouseTable.columns = this.columns;
       genericLakehouseTable.comment = this.comment;
-      genericLakehouseTable.properties =
-          ImmutableMap.<String, String>builder()
-              .putAll(this.properties)
-              .put(Table.PROPERTY_TABLE_FORMAT, this.format)
-              .buildKeepingLast();
+
+      if (format != null) {
+        genericLakehouseTable.properties =
+            ImmutableMap.<String, String>builder()
+                .putAll(this.properties)
+                .put(Table.PROPERTY_TABLE_FORMAT, this.format)
+                .buildKeepingLast();
+      } else {
+        genericLakehouseTable.properties = this.properties;
+      }
       genericLakehouseTable.auditInfo = this.auditInfo;
       genericLakehouseTable.distribution = this.distribution;
       genericLakehouseTable.indexes = this.indexes;
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/GenericTable.java 
b/core/src/main/java/org/apache/gravitino/connector/GenericTable.java
new file mode 100644
index 0000000000..2d80b7c38f
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/connector/GenericTable.java
@@ -0,0 +1,71 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.connector;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** A generic table implementation that represents the table managed in 
Gravitino. */
+@DeveloperApi
+public class GenericTable extends BaseTable {
+
+  /**
+   * Creates a new builder for constructing a GenericTable instance.
+   *
+   * @return A new Builder instance.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * This method is not supported for GenericTable and will always throw an
+   * UnsupportedOperationException.
+   *
+   * @throws UnsupportedOperationException always thrown to indicate that 
table operations are not
+   *     supported.
+   */
+  @Override
+  protected TableOperations newOps() throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("Generic Table doesn't support 
table operations");
+  }
+
+  /** A builder class for constructing GenericTable instances. */
+  public static class Builder extends BaseTableBuilder<Builder, GenericTable> {
+    /**
+     * Internal method to build a GenericTable instance using the provided 
values.
+     *
+     * @return A new GenericTable instance with the configured values.
+     */
+    @Override
+    protected GenericTable internalBuild() {
+      GenericTable table = new GenericTable();
+      table.columns = this.columns;
+      table.comment = this.comment;
+      table.properties = this.properties;
+      table.auditInfo = this.auditInfo;
+      table.distribution = this.distribution;
+      table.indexes = this.indexes;
+      table.name = this.name;
+      table.partitioning = this.partitioning;
+      table.sortOrders = this.sortOrders;
+      return table;
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
index 638a1ab346..3055c8fff9 100644
--- a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
@@ -75,10 +75,6 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
 
   private List<ColumnEntity> columns;
 
-  @Getter
-  @Accessors(fluent = true)
-  private String format;
-
   @Getter
   @Accessors(fluent = true)
   private Map<String, String> properties;
@@ -115,7 +111,6 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
     fields.put(NAME, name);
     fields.put(AUDIT_INFO, auditInfo);
     fields.put(COLUMNS, columns);
-    fields.put(FORMAT, format);
     fields.put(PROPERTIES, properties);
     fields.put(PARTITIONING, partitioning);
     fields.put(SORT_ORDERS, sortOrders);
@@ -195,7 +190,6 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
         && Objects.equal(namespace, baseTable.namespace)
         && Objects.equal(auditInfo, baseTable.auditInfo)
         && CollectionUtils.isEqualCollection(columns, baseTable.columns)
-        && Objects.equal(format, baseTable.format)
         // Please check the correctness of this comparison.
         && Objects.equal(properties, baseTable.properties)
         && Arrays.equals(partitioning, baseTable.partitioning)
@@ -213,7 +207,6 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
         auditInfo,
         columns,
         namespace,
-        format,
         properties,
         Arrays.hashCode(partitioning),
         Arrays.hashCode(sortOrders),
@@ -256,11 +249,6 @@ public class TableEntity implements Entity, Auditable, 
HasIdentifier {
       return this;
     }
 
-    public Builder withFormat(String format) {
-      tableEntity.format = format;
-      return this;
-    }
-
     public Builder withProperties(Map<String, String> properties) {
       tableEntity.properties = properties;
       return this;
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 802ce6f810..87cda80432 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.storage.relational.utils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.List;
@@ -63,6 +64,7 @@ import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.policy.Policy;
 import org.apache.gravitino.policy.PolicyContent;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.expressions.Expression;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.storage.relational.po.CatalogPO;
@@ -401,7 +403,10 @@ public class POConverters {
           .withCurrentVersion(INIT_VERSION)
           .withLastVersion(INIT_VERSION)
           .withDeletedAt(DEFAULT_DELETED_AT)
-          .withFormat(tableEntity.format())
+          .withFormat(
+              tableEntity.properties() == null
+                  ? null
+                  : 
tableEntity.properties().getOrDefault(Table.PROPERTY_TABLE_FORMAT, null))
           .withComment(tableEntity.comment())
           .withProperties(
               tableEntity.properties() == null
@@ -485,8 +490,10 @@ public class POConverters {
                       ? null
                       : JsonUtils.anyFieldMapper()
                           
.writeValueAsString(DTOConverters.toDTOs(newTable.partitioning())))
-              // TODO support partitions later
-              .withFormat(newTable.format());
+              .withFormat(
+                  newTable.properties() == null
+                      ? null
+                      : 
newTable.properties().getOrDefault(Table.PROPERTY_TABLE_FORMAT, null));
 
       return builder.build();
     } catch (JsonProcessingException e) {
@@ -508,6 +515,14 @@ public class POConverters {
   public static TableEntity fromTableAndColumnPOs(
       TablePO tablePO, List<ColumnPO> columnPOs, Namespace namespace) {
     try {
+      Map<String, String> properties =
+          StringUtils.isBlank(tablePO.getProperties())
+              ? Maps.newHashMap()
+              : JsonUtils.anyFieldMapper().readValue(tablePO.getProperties(), 
Map.class);
+      if (StringUtils.isNotBlank(tablePO.getFormat())) {
+        properties.put(Table.PROPERTY_TABLE_FORMAT, tablePO.getFormat());
+      }
+
       return TableEntity.builder()
           .withId(tablePO.getTableId())
           .withName(tablePO.getTableName())
@@ -538,12 +553,8 @@ public class POConverters {
                   ? null
                   : JsonUtils.anyFieldMapper()
                       .readValue(tablePO.getPartitions(), 
Partitioning[].class))
-          .withFormat(tablePO.getFormat())
           .withComment(tablePO.getComment())
-          .withProperties(
-              StringUtils.isBlank(tablePO.getProperties())
-                  ? null
-                  : 
JsonUtils.anyFieldMapper().readValue(tablePO.getProperties(), Map.class))
+          .withProperties(properties)
           .withColumns(fromColumnPOs(columnPOs))
           .build();
     } catch (JsonProcessingException e) {
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestManagedTableOperations.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestManagedTableOperations.java
new file mode 100644
index 0000000000..88f1575427
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestManagedTableOperations.java
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.connector.GenericColumn;
+import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.memory.TestMemoryEntityStore;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestManagedTableOperations {
+
+  public static class InMemoryManagedTableOperations extends 
ManagedTableOperations {
+
+    private final EntityStore entityStore;
+
+    private final SupportsSchemas supportsSchemas;
+
+    private final IdGenerator idGenerator;
+
+    public InMemoryManagedTableOperations(
+        EntityStore entityStore, SupportsSchemas supportsSchemas, IdGenerator 
idGenerator) {
+      this.entityStore = entityStore;
+      this.supportsSchemas = supportsSchemas;
+      this.idGenerator = idGenerator;
+    }
+
+    @Override
+    protected EntityStore store() {
+      return entityStore;
+    }
+
+    @Override
+    protected SupportsSchemas schemas() {
+      return supportsSchemas;
+    }
+
+    @Override
+    protected IdGenerator idGenerator() {
+      return idGenerator;
+    }
+  }
+
+  private static final String METALAKE_NAME = "test_metalake";
+  private static final String CATALOG_NAME = "test_catalog";
+  private static final String SCHEMA_NAME = "schema1";
+
+  private final EntityStore store = new 
TestMemoryEntityStore.InMemoryEntityStore();
+  private final SupportsSchemas schemas =
+      new ManagedSchemaOperations() {
+        @Override
+        protected EntityStore store() {
+          return store;
+        }
+      };
+  private final IdGenerator idGenerator = new RandomIdGenerator();
+
+  private ManagedTableOperations tableOperations =
+      new InMemoryManagedTableOperations(store, schemas, idGenerator);
+
+  @BeforeEach
+  public void setUp() {
+    ((TestMemoryEntityStore.InMemoryEntityStore) store).clear();
+    // Create a schema
+    NameIdentifier schemaIdent =
+        NameIdentifierUtil.ofSchema(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME);
+    StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId());
+    Map<String, String> schemaProperties =
+        StringIdentifier.newPropertiesWithId(stringId, Collections.emptyMap());
+    Schema schema = schemas.createSchema(schemaIdent, "Test Schema 1", 
schemaProperties);
+    Assertions.assertEquals("schema1", schema.name());
+  }
+
+  @Test
+  public void testCreateAndListTables() {
+    // Create a table
+    NameIdentifier table1Ident =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"table1");
+    Column column1 = createColumn("col1", Types.StringType.get(), null);
+    Column column2 = createColumn("col2", Types.IntegerType.get(), 
Literals.integerLiteral(1));
+    Column[] columns = new Column[] {column1, column2};
+    Transform[] partitioning = new Transform[0];
+    Distribution distribution = Distributions.NONE;
+    Index[] indexes = Indexes.EMPTY_INDEXES;
+    SortOrder[] sortOrders = new SortOrder[0];
+
+    tableOperations.createTable(
+        table1Ident,
+        columns,
+        "Test Table 1",
+        StringIdentifier.newPropertiesWithId(
+            StringIdentifier.fromId(idGenerator.nextId()), 
Collections.emptyMap()),
+        partitioning,
+        distribution,
+        sortOrders,
+        indexes);
+
+    NameIdentifier table2Ident =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"table2");
+    tableOperations.createTable(
+        table2Ident,
+        columns,
+        "Test Table 2",
+        StringIdentifier.newPropertiesWithId(
+            StringIdentifier.fromId(idGenerator.nextId()), 
Collections.emptyMap()),
+        partitioning,
+        distribution,
+        sortOrders,
+        indexes);
+
+    // List tables
+    NameIdentifier[] tableIdents =
+        tableOperations.listTables(NamespaceUtil.ofTable(METALAKE_NAME, 
CATALOG_NAME, "schema1"));
+    Assertions.assertEquals(2, tableIdents.length);
+    Set<String> tableNames =
+        
Arrays.stream(tableIdents).map(NameIdentifier::name).collect(Collectors.toSet());
+
+    Assertions.assertTrue(tableNames.contains("table1"));
+    Assertions.assertTrue(tableNames.contains("table2"));
+  }
+
+  @Test
+  public void testCreateAndLoadTable() {
+    NameIdentifier table1Ident =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"table1");
+    Column column1 = createColumn("col1", Types.StringType.get(), null);
+    Column column2 = createColumn("col2", Types.IntegerType.get(), 
Literals.integerLiteral(1));
+    Column[] columns = new Column[] {column1, column2};
+    Transform[] partitioning = new Transform[0];
+    Distribution distribution = Distributions.NONE;
+    Index[] indexes = Indexes.EMPTY_INDEXES;
+    SortOrder[] sortOrders = new SortOrder[0];
+
+    Table newTable =
+        tableOperations.createTable(
+            table1Ident,
+            columns,
+            "Test Table 1",
+            StringIdentifier.newPropertiesWithId(
+                StringIdentifier.fromId(idGenerator.nextId()), 
Collections.emptyMap()),
+            partitioning,
+            distribution,
+            sortOrders,
+            indexes);
+
+    Table loadedTable = tableOperations.loadTable(table1Ident);
+    assertTableEquals(newTable, loadedTable);
+
+    // Test create table that already exists
+    Assertions.assertThrows(
+        TableAlreadyExistsException.class,
+        () ->
+            tableOperations.createTable(
+                table1Ident,
+                columns,
+                "Test Table 1",
+                StringIdentifier.newPropertiesWithId(
+                    StringIdentifier.fromId(idGenerator.nextId()), 
Collections.emptyMap()),
+                partitioning,
+                distribution,
+                sortOrders,
+                indexes));
+
+    // Test load non-existing table
+    NameIdentifier nonExistingTableIdent =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"non_existing_table");
+    Assertions.assertThrows(
+        NoSuchTableException.class, () -> 
tableOperations.loadTable(nonExistingTableIdent));
+  }
+
+  @Test
+  public void testCreateAndDropTable() {
+    NameIdentifier table1Ident =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"table1");
+    Column column1 = createColumn("col1", Types.StringType.get(), null);
+    Column column2 = createColumn("col2", Types.IntegerType.get(), 
Literals.integerLiteral(1));
+    Column[] columns = new Column[] {column1, column2};
+    Transform[] partitioning = new Transform[0];
+    Distribution distribution = Distributions.NONE;
+    Index[] indexes = Indexes.EMPTY_INDEXES;
+    SortOrder[] sortOrders = new SortOrder[0];
+
+    tableOperations.createTable(
+        table1Ident,
+        columns,
+        "Test Table 1",
+        StringIdentifier.newPropertiesWithId(
+            StringIdentifier.fromId(idGenerator.nextId()), 
Collections.emptyMap()),
+        partitioning,
+        distribution,
+        sortOrders,
+        indexes);
+
+    // Drop the table
+    boolean dropped = tableOperations.dropTable(table1Ident);
+    Assertions.assertTrue(dropped);
+
+    // Verify the table is dropped
+    Assertions.assertThrows(
+        NoSuchTableException.class, () -> 
tableOperations.loadTable(table1Ident));
+
+    // Test drop non-existing table
+    Assertions.assertFalse(() -> tableOperations.dropTable(table1Ident));
+  }
+
+  @Test
+  public void testCreateAndPurgeTable() {
+    NameIdentifier table1Ident =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"table1");
+    Column column1 = createColumn("col1", Types.StringType.get(), null);
+    Column column2 = createColumn("col2", Types.IntegerType.get(), 
Literals.integerLiteral(1));
+    Column[] columns = new Column[] {column1, column2};
+    Transform[] partitioning = new Transform[0];
+    Distribution distribution = Distributions.NONE;
+    Index[] indexes = Indexes.EMPTY_INDEXES;
+    SortOrder[] sortOrders = new SortOrder[0];
+
+    tableOperations.createTable(
+        table1Ident,
+        columns,
+        "Test Table 1",
+        StringIdentifier.newPropertiesWithId(
+            StringIdentifier.fromId(idGenerator.nextId()), 
Collections.emptyMap()),
+        partitioning,
+        distribution,
+        sortOrders,
+        indexes);
+
+    // Purge the table
+    boolean purged = tableOperations.purgeTable(table1Ident);
+    Assertions.assertTrue(purged);
+
+    // Verify the table is purged
+    Assertions.assertThrows(
+        NoSuchTableException.class, () -> 
tableOperations.loadTable(table1Ident));
+
+    // Test purge non-existing table
+    Assertions.assertFalse(() -> tableOperations.purgeTable(table1Ident));
+  }
+
+  private Column createColumn(String name, Type dataType, Expression 
defaultValue) {
+    return GenericColumn.builder()
+        .withName(name)
+        .withComment("Test column " + name)
+        .withType(dataType)
+        .withDefaultValue(defaultValue)
+        .withNullable(true)
+        .withAutoIncrement(false)
+        .build();
+  }
+
+  @Test
+  public void testAlterTable() {
+    NameIdentifier table1Ident =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"table1");
+    Column column1 = createColumn("col1", Types.StringType.get(), null);
+    Column column2 = createColumn("col2", Types.IntegerType.get(), 
Literals.integerLiteral(1));
+    Column[] columns = new Column[] {column1, column2};
+    Transform[] partitioning = new Transform[0];
+    Distribution distribution = Distributions.NONE;
+    Index[] indexes = Indexes.EMPTY_INDEXES;
+    SortOrder[] sortOrders = new SortOrder[0];
+
+    tableOperations.createTable(
+        table1Ident,
+        columns,
+        "Test Table 1",
+        StringIdentifier.newPropertiesWithId(
+            StringIdentifier.fromId(idGenerator.nextId()), 
Collections.emptyMap()),
+        partitioning,
+        distribution,
+        sortOrders,
+        indexes);
+
+    // Test rename the table
+    Table renamedTable =
+        tableOperations.alterTable(table1Ident, 
TableChange.rename("table1_renamed"));
+    Assertions.assertEquals("table1_renamed", renamedTable.name());
+
+    Table loadedRenamedTable =
+        tableOperations.loadTable(
+            NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME, "table1_renamed"));
+    Assertions.assertEquals("table1_renamed", loadedRenamedTable.name());
+
+    // Test rename the table to another schema
+    NameIdentifier renamedTable1Ident =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"table1_renamed");
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableOperations.alterTable(
+                renamedTable1Ident, TableChange.rename("table1_moved", 
"schema2")));
+
+    // Test update the table comment
+    String newComment = "Updated Test Table 1 Comment";
+    Table updatedCommentTable =
+        tableOperations.alterTable(renamedTable1Ident, 
TableChange.updateComment(newComment));
+    Assertions.assertEquals(newComment, updatedCommentTable.comment());
+
+    Table loadedUpdatedCommentTable = 
tableOperations.loadTable(renamedTable1Ident);
+    Assertions.assertEquals(newComment, loadedUpdatedCommentTable.comment());
+
+    // Test set a new property
+    String propertyKey = "property1";
+    String propertyValue = "value1";
+    Table updatedPropertyTable =
+        tableOperations.alterTable(
+            renamedTable1Ident, TableChange.setProperty(propertyKey, 
propertyValue));
+    Assertions.assertEquals(propertyValue, 
updatedPropertyTable.properties().get(propertyKey));
+
+    Table loadedUpdatedPropertyTable = 
tableOperations.loadTable(renamedTable1Ident);
+    Assertions.assertEquals(
+        propertyValue, 
loadedUpdatedPropertyTable.properties().get(propertyKey));
+
+    // Test remove the property
+    Table removedPropertyTable =
+        tableOperations.alterTable(renamedTable1Ident, 
TableChange.removeProperty(propertyKey));
+    Assertions.assertNull(removedPropertyTable.properties().get(propertyKey));
+
+    Table loadedRemovedPropertyTable = 
tableOperations.loadTable(renamedTable1Ident);
+    
Assertions.assertNull(loadedRemovedPropertyTable.properties().get(propertyKey));
+
+    // Test remove the non-existing property
+    Table removeNonExistingPropertyTable =
+        tableOperations.alterTable(
+            renamedTable1Ident, 
TableChange.removeProperty("non_existing_property"));
+    Assertions.assertEquals(
+        removedPropertyTable.properties(), 
removeNonExistingPropertyTable.properties());
+
+    // Test Add the index
+    Table addedIndexTable =
+        tableOperations.alterTable(
+            renamedTable1Ident,
+            TableChange.addIndex(Index.IndexType.PRIMARY_KEY, "index1", new 
String[][] {{"col1"}}));
+    Assertions.assertEquals(1, addedIndexTable.index().length);
+    Assertions.assertEquals("index1", addedIndexTable.index()[0].name());
+    Assertions.assertEquals(Index.IndexType.PRIMARY_KEY, 
addedIndexTable.index()[0].type());
+
+    Table loadedAddedIndexTable = 
tableOperations.loadTable(renamedTable1Ident);
+    Assertions.assertEquals(1, loadedAddedIndexTable.index().length);
+    Assertions.assertEquals("index1", loadedAddedIndexTable.index()[0].name());
+    Assertions.assertEquals(Index.IndexType.PRIMARY_KEY, 
loadedAddedIndexTable.index()[0].type());
+
+    // Test Remove the index
+    Table removedIndexTable =
+        tableOperations.alterTable(renamedTable1Ident, 
TableChange.deleteIndex("index1", true));
+    Assertions.assertEquals(0, removedIndexTable.index().length);
+
+    Table loadedRemovedIndexTable = 
tableOperations.loadTable(renamedTable1Ident);
+    Assertions.assertEquals(0, loadedRemovedIndexTable.index().length);
+
+    // Test Remove the non-existing index
+    Table removeNonExistingIndexTable =
+        tableOperations.alterTable(
+            renamedTable1Ident, TableChange.deleteIndex("non_existing_index", 
true));
+    Assertions.assertEquals(0, removeNonExistingIndexTable.index().length);
+
+    // Test Remove the non-existing index without ifExists
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableOperations.alterTable(
+                renamedTable1Ident, 
TableChange.deleteIndex("non_existing_index", false)));
+
+    // Test alter non-existing table
+    NameIdentifier nonExistingTableIdent =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"non_existing_table");
+    Assertions.assertThrows(
+        NoSuchTableException.class,
+        () ->
+            tableOperations.alterTable(nonExistingTableIdent, 
TableChange.rename("another_name")));
+  }
+
+  @Test
+  public void testAlterAddAndDeleteColumns() {
+    NameIdentifier table1Ident =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"table1");
+    Column column1 = createColumn("col1", Types.StringType.get(), null);
+    Column column2 = createColumn("col2", Types.IntegerType.get(), 
Literals.integerLiteral(1));
+    Column[] columns = new Column[] {column1, column2};
+    Transform[] partitioning = new Transform[0];
+    Distribution distribution = Distributions.NONE;
+    Index[] indexes = Indexes.EMPTY_INDEXES;
+    SortOrder[] sortOrders = new SortOrder[0];
+
+    tableOperations.createTable(
+        table1Ident,
+        columns,
+        "Test Table 1",
+        StringIdentifier.newPropertiesWithId(
+            StringIdentifier.fromId(idGenerator.nextId()), 
Collections.emptyMap()),
+        partitioning,
+        distribution,
+        sortOrders,
+        indexes);
+
+    // Test add a new column
+    Table tableWithNewColumn =
+        tableOperations.alterTable(
+            table1Ident,
+            TableChange.addColumn(
+                new String[] {"col3"},
+                Types.BooleanType.get(),
+                TableChange.ColumnPosition.defaultPos(),
+                Literals.booleanLiteral(true)));
+    Assertions.assertEquals(3, tableWithNewColumn.columns().length);
+    // col3 should be the last column
+    Assertions.assertEquals("col3", tableWithNewColumn.columns()[2].name());
+    Assertions.assertEquals(Types.BooleanType.get(), 
tableWithNewColumn.columns()[2].dataType());
+
+    Table loadedTableWithNewColumn = tableOperations.loadTable(table1Ident);
+    Assertions.assertEquals(3, loadedTableWithNewColumn.columns().length);
+    // col3 should be the last column
+    Assertions.assertEquals("col3", 
loadedTableWithNewColumn.columns()[2].name());
+    Assertions.assertEquals(
+        Types.BooleanType.get(), 
loadedTableWithNewColumn.columns()[2].dataType());
+
+    // Test add a new column at first position
+    Table tableWithNewColumnAtFirst =
+        tableOperations.alterTable(
+            table1Ident,
+            TableChange.addColumn(
+                new String[] {"col0"},
+                Types.FloatType.get(),
+                TableChange.ColumnPosition.first(),
+                null));
+    Assertions.assertEquals(4, tableWithNewColumnAtFirst.columns().length);
+    // col0 should be the first column
+    Assertions.assertEquals("col0", 
tableWithNewColumnAtFirst.columns()[0].name());
+    Assertions.assertEquals(
+        Types.FloatType.get(), 
tableWithNewColumnAtFirst.columns()[0].dataType());
+
+    Table loadedTableWithNewColumnAtFirst = 
tableOperations.loadTable(table1Ident);
+    Assertions.assertEquals(4, 
loadedTableWithNewColumnAtFirst.columns().length);
+    // col0 should be the first column
+    Assertions.assertEquals("col0", 
loadedTableWithNewColumnAtFirst.columns()[0].name());
+    Assertions.assertEquals(
+        Types.FloatType.get(), 
loadedTableWithNewColumnAtFirst.columns()[0].dataType());
+
+    // Test add a new column after col1
+    Table tableWithNewColumnAfterCol1 =
+        tableOperations.alterTable(
+            table1Ident,
+            TableChange.addColumn(
+                new String[] {"col1_5"},
+                Types.DoubleType.get(),
+                TableChange.ColumnPosition.after("col1"),
+                null));
+    Assertions.assertEquals(5, tableWithNewColumnAfterCol1.columns().length);
+    // col1_5 should be after col1
+    Assertions.assertEquals("col1_5", 
tableWithNewColumnAfterCol1.columns()[2].name());
+    Assertions.assertEquals(
+        Types.DoubleType.get(), 
tableWithNewColumnAfterCol1.columns()[2].dataType());
+
+    Table loadedTableWithNewColumnAfterCol1 = 
tableOperations.loadTable(table1Ident);
+    Assertions.assertEquals(5, 
loadedTableWithNewColumnAfterCol1.columns().length);
+    // col1_5 should be after col1
+    Assertions.assertEquals("col1_5", 
loadedTableWithNewColumnAfterCol1.columns()[2].name());
+    Assertions.assertEquals(
+        Types.DoubleType.get(), 
loadedTableWithNewColumnAfterCol1.columns()[2].dataType());
+
+    // Test add a new column after non-existing column
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableOperations.alterTable(
+                table1Ident,
+                TableChange.addColumn(
+                    new String[] {"colX"},
+                    Types.DoubleType.get(),
+                    TableChange.ColumnPosition.after("non_existing_column"),
+                    null)));
+
+    // Test add an existing column
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableOperations.alterTable(
+                table1Ident,
+                TableChange.addColumn(
+                    new String[] {"col1"},
+                    Types.DoubleType.get(),
+                    TableChange.ColumnPosition.defaultPos(),
+                    null)));
+
+    // Test delete a column
+    Table tableAfterDeleteColumn =
+        tableOperations.alterTable(
+            table1Ident, TableChange.deleteColumn(new String[] {"col2"}, 
true));
+    Assertions.assertEquals(4, tableAfterDeleteColumn.columns().length);
+    // col2 should be deleted
+    Assertions.assertFalse(
+        Arrays.stream(tableAfterDeleteColumn.columns())
+            .anyMatch(column -> column.name().equals("col2")));
+
+    Table loadedTableAfterDeleteColumn = 
tableOperations.loadTable(table1Ident);
+    Assertions.assertEquals(4, loadedTableAfterDeleteColumn.columns().length);
+    // col2 should be deleted
+    Assertions.assertFalse(
+        Arrays.stream(loadedTableAfterDeleteColumn.columns())
+            .anyMatch(column -> column.name().equals("col2")));
+
+    // Test delete a non-existing column with ifExists
+    Table tableAfterDeleteNonExistingColumn =
+        tableOperations.alterTable(
+            table1Ident, TableChange.deleteColumn(new String[] 
{"non_existing_col"}, true));
+    // The table schema should remain unchanged
+    Assertions.assertEquals(
+        tableAfterDeleteColumn.columns().length,
+        tableAfterDeleteNonExistingColumn.columns().length);
+    Assertions.assertArrayEquals(
+        tableAfterDeleteColumn.columns(), 
tableAfterDeleteNonExistingColumn.columns());
+
+    Table loadedTableAfterDeleteNonExistingColumn = 
tableOperations.loadTable(table1Ident);
+    // The table schema should remain unchanged
+    Assertions.assertEquals(
+        tableAfterDeleteColumn.columns().length,
+        loadedTableAfterDeleteNonExistingColumn.columns().length);
+    Assertions.assertArrayEquals(
+        tableAfterDeleteColumn.columns(), 
loadedTableAfterDeleteNonExistingColumn.columns());
+
+    // Test delete a non-existing column without ifExists
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableOperations.alterTable(
+                table1Ident, TableChange.deleteColumn(new String[] 
{"non_existing_col"}, false)));
+  }
+
+  @Test
+  public void testUpdateTableColumn() {
+    NameIdentifier table1Ident =
+        NameIdentifierUtil.ofTable(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, 
"table1");
+    Column column1 = createColumn("col1", Types.StringType.get(), null);
+    Column column2 = createColumn("col2", Types.IntegerType.get(), 
Literals.integerLiteral(1));
+    Column[] columns = new Column[] {column1, column2};
+    Transform[] partitioning = new Transform[0];
+    Distribution distribution = Distributions.NONE;
+    Index[] indexes = Indexes.EMPTY_INDEXES;
+    SortOrder[] sortOrders = new SortOrder[0];
+
+    tableOperations.createTable(
+        table1Ident,
+        columns,
+        "Test Table 1",
+        StringIdentifier.newPropertiesWithId(
+            StringIdentifier.fromId(idGenerator.nextId()), 
Collections.emptyMap()),
+        partitioning,
+        distribution,
+        sortOrders,
+        indexes);
+
+    // Test rename column col1 to col1_renamed
+    Table tableAfterRenameColumn =
+        tableOperations.alterTable(
+            table1Ident, TableChange.renameColumn(new String[] {"col1"}, 
"col1_renamed"));
+    Assertions.assertEquals(2, tableAfterRenameColumn.columns().length);
+    // col1 should be renamed to col1_renamed
+    Assertions.assertTrue(
+        Arrays.stream(tableAfterRenameColumn.columns())
+            .anyMatch(column -> column.name().equals("col1_renamed")));
+    Assertions.assertFalse(
+        Arrays.stream(tableAfterRenameColumn.columns())
+            .anyMatch(column -> column.name().equals("col1")));
+
+    // Test rename non-existing column
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableOperations.alterTable(
+                table1Ident,
+                TableChange.renameColumn(new String[] {"non_existing_col"}, 
"new_name")));
+
+    // Test rename column to an existing column name
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableOperations.alterTable(
+                table1Ident, TableChange.renameColumn(new String[] {"col2"}, 
"col1_renamed")));
+
+    // Test update column default value
+    Expression newDefaultValue = Literals.integerLiteral(100);
+    Table tableAfterUpdateDefaultValue =
+        tableOperations.alterTable(
+            table1Ident,
+            TableChange.updateColumnDefaultValue(new String[] {"col2"}, 
newDefaultValue));
+    Assertions.assertEquals(2, tableAfterUpdateDefaultValue.columns().length);
+    // col2 should have the new default value
+    Column updatedCol2 =
+        Arrays.stream(tableAfterUpdateDefaultValue.columns())
+            .filter(column -> column.name().equals("col2"))
+            .findFirst()
+            .orElseThrow();
+    Assertions.assertEquals(newDefaultValue, updatedCol2.defaultValue());
+
+    // Test update non-existing column default value
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableOperations.alterTable(
+                table1Ident,
+                TableChange.updateColumnDefaultValue(
+                    new String[] {"non_existing_col"}, 
Literals.stringLiteral("default"))));
+
+    // Test update column type
+    Type newType = Types.LongType.get();
+    Table tableAfterUpdateColumnType =
+        tableOperations.alterTable(
+            table1Ident, TableChange.updateColumnType(new String[] {"col2"}, 
newType));
+    Assertions.assertEquals(2, tableAfterUpdateColumnType.columns().length);
+    // col2 should have the new type
+    Column updatedTypeCol2 =
+        Arrays.stream(tableAfterUpdateColumnType.columns())
+            .filter(column -> column.name().equals("col2"))
+            .findFirst()
+            .orElseThrow();
+    Assertions.assertEquals(newType, updatedTypeCol2.dataType());
+
+    // Test update column comment
+    String newComment = "Updated column comment";
+    Table tableAfterUpdateColumnComment =
+        tableOperations.alterTable(
+            table1Ident, TableChange.updateColumnComment(new String[] 
{"col2"}, newComment));
+    Assertions.assertEquals(2, tableAfterUpdateColumnComment.columns().length);
+    // col2 should have the new comment
+    Column updatedCommentCol2 =
+        Arrays.stream(tableAfterUpdateColumnComment.columns())
+            .filter(column -> column.name().equals("col2"))
+            .findFirst()
+            .orElseThrow();
+    Assertions.assertEquals(newComment, updatedCommentCol2.comment());
+
+    // Test update the column position
+    Table tableAfterUpdateColumnPosition =
+        tableOperations.alterTable(
+            table1Ident,
+            TableChange.updateColumnPosition(
+                new String[] {"col2"}, TableChange.ColumnPosition.first()));
+    Assertions.assertEquals(2, 
tableAfterUpdateColumnPosition.columns().length);
+    // col2 should be the first column now
+    Assertions.assertEquals("col2", 
tableAfterUpdateColumnPosition.columns()[0].name());
+
+    // Test update the column position after a specific column
+    Table tableAfterUpdateColumnPositionAfter =
+        tableOperations.alterTable(
+            table1Ident,
+            TableChange.updateColumnPosition(
+                new String[] {"col2"}, 
TableChange.ColumnPosition.after("col1_renamed")));
+
+    Assertions.assertEquals(2, 
tableAfterUpdateColumnPositionAfter.columns().length);
+    // col2 should be after col1_renamed
+    Assertions.assertEquals(
+        "col1_renamed", 
tableAfterUpdateColumnPositionAfter.columns()[0].name());
+    Assertions.assertEquals("col2", 
tableAfterUpdateColumnPositionAfter.columns()[1].name());
+
+    // Test update column position after non-existing column
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableOperations.alterTable(
+                table1Ident,
+                TableChange.updateColumnPosition(
+                    new String[] {"col2"}, 
TableChange.ColumnPosition.after("non_existing_col"))));
+
+    // Test update column position with unsupported position type
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableOperations.alterTable(
+                table1Ident,
+                TableChange.updateColumnPosition(
+                    new String[] {"col2"}, 
TableChange.ColumnPosition.defaultPos())));
+
+    // Test update the column nullable property
+    Table tableAfterUpdateColumnNullable =
+        tableOperations.alterTable(
+            table1Ident, TableChange.updateColumnNullability(new String[] 
{"col2"}, false));
+    Assertions.assertEquals(2, 
tableAfterUpdateColumnNullable.columns().length);
+    // col2 should be not nullable now
+    Column updatedNullableCol2 =
+        Arrays.stream(tableAfterUpdateColumnNullable.columns())
+            .filter(column -> column.name().equals("col2"))
+            .findFirst()
+            .orElseThrow();
+    Assertions.assertFalse(updatedNullableCol2.nullable());
+
+    // Test update the column auto-increment property
+    Table tableAfterUpdateColumnAutoIncrement =
+        tableOperations.alterTable(
+            table1Ident, TableChange.updateColumnAutoIncrement(new String[] 
{"col2"}, true));
+    Assertions.assertEquals(2, 
tableAfterUpdateColumnAutoIncrement.columns().length);
+    // col2 should be auto-increment now
+    Column updatedAutoIncrementCol2 =
+        Arrays.stream(tableAfterUpdateColumnAutoIncrement.columns())
+            .filter(column -> column.name().equals("col2"))
+            .findFirst()
+            .orElseThrow();
+    Assertions.assertTrue(updatedAutoIncrementCol2.autoIncrement());
+  }
+
+  private void assertTableEquals(Table expected, Table actual) {
+    Assertions.assertEquals(expected.name(), actual.name());
+    Assertions.assertEquals(expected.comment(), actual.comment());
+    Assertions.assertArrayEquals(expected.columns(), actual.columns());
+    Assertions.assertArrayEquals(expected.partitioning(), 
actual.partitioning());
+    Assertions.assertArrayEquals(expected.sortOrder(), actual.sortOrder());
+    Assertions.assertEquals(expected.distribution(), actual.distribution());
+    Assertions.assertArrayEquals(expected.index(), actual.index());
+  }
+}
diff --git a/core/src/test/java/org/apache/gravitino/meta/TestEntity.java 
b/core/src/test/java/org/apache/gravitino/meta/TestEntity.java
index b6653d11ab..0bb16c0494 100644
--- a/core/src/test/java/org/apache/gravitino/meta/TestEntity.java
+++ b/core/src/test/java/org/apache/gravitino/meta/TestEntity.java
@@ -160,7 +160,6 @@ public class TestEntity {
 
   @Test
   public void testTable() {
-    String format = "parquet";
     String comment = "test table comment";
     Map<String, String> tableProperties = ImmutableMap.of("tableKey1", 
"tableValue1");
     SortOrder[] sortOrders =
@@ -174,7 +173,6 @@ public class TestEntity {
             .withId(tableId)
             .withName(tableName)
             .withAuditInfo(auditInfo)
-            .withFormat(format)
             .withSortOrders(sortOrders)
             .withProperties(tableProperties)
             .withComment(comment)
@@ -186,7 +184,6 @@ public class TestEntity {
     Assertions.assertEquals(tableId, fields.get(TableEntity.ID));
     Assertions.assertEquals(tableName, fields.get(TableEntity.NAME));
     Assertions.assertEquals(auditInfo, fields.get(TableEntity.AUDIT_INFO));
-    Assertions.assertEquals(format, fields.get(TableEntity.FORMAT));
     Assertions.assertEquals(tableProperties, 
fields.get(TableEntity.PROPERTIES));
     Assertions.assertEquals(comment, fields.get(TableEntity.COMMENT));
     Assertions.assertEquals(sortOrders, fields.get(TableEntity.SORT_ORDERS));
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java 
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
index 89f276724c..188c72f93b 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
@@ -2702,7 +2702,6 @@ public class TestEntityStorage {
                           .withAuditInfo(auditInfo)
                           .build()))
               .withComment("This is a lance table")
-              .withFormat("lance")
               .withProperties(ImmutableMap.of("location", "/tmp/test", 
"format", "lance"))
               .build();
       store.put(table, false);
@@ -2723,7 +2722,6 @@ public class TestEntityStorage {
               .withNamespace(table.namespace())
               .withName(table.name())
               .withAuditInfo(auditInfo)
-              .withFormat("lance")
               .withColumns(
                   Lists.newArrayList(
                       ColumnEntity.builder()
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index 5cd67cc798..4df794754c 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -1245,15 +1245,14 @@ public class TestJDBCBackend {
             .withNamespace(NamespaceUtil.ofTable(metalakeName, catalog.name(), 
schema.name()))
             .withName("table")
             .withAuditInfo(auditInfo)
-            .withFormat("lance")
             .withComment(null)
-            .withProperties(ImmutableMap.of("format", "LANCE", "location", 
"/tmp/test/lance"))
+            .withProperties(ImmutableMap.of("format", "lance", "location", 
"/tmp/test/lance"))
             .build();
 
     backend.insert(table, false);
 
     TableEntity fetchedTable = backend.get(table.nameIdentifier(), 
Entity.EntityType.TABLE);
-    Assertions.assertEquals("LANCE", fetchedTable.properties().get("format"));
+    Assertions.assertEquals("lance", fetchedTable.properties().get("format"));
 
     TableEntity updatedTable =
         TableEntity.builder()

Reply via email to