This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.7 by this push:
     new b104dd760 [#5227] feat(oceanbase-catalog): Support table operations 
for OceanBase JDBC catalog (#5265)
b104dd760 is described below

commit b104dd760cbb9109973126544863f7f05035930d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 25 16:37:57 2024 +0800

    [#5227] feat(oceanbase-catalog): Support table operations for OceanBase 
JDBC catalog (#5265)
    
    ### What changes were proposed in this pull request?
    
    Support table operations for OceanBase JDBC catalog.
    
    ### Why are the changes needed?
    
    Fix: #5227
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    
    Added unit test: `TestOceanBaseTableOperations`
    
    Co-authored-by: yuanoOo <[email protected]>
---
 .../jdbc/operation/JdbcTableOperations.java        | 103 +++-
 .../doris/operation/DorisTableOperations.java      |  32 --
 .../mysql/operation/MysqlTableOperations.java      |  92 +---
 .../mysql/integration/test/CatalogMysqlIT.java     |   4 +-
 .../mysql/operation/TestMysqlTableOperations.java  |   4 +-
 .../operation/OceanBaseTableOperations.java        | 600 ++++++++++++++++++++-
 .../operation/TestOceanBaseTableOperations.java}   | 214 +++-----
 .../operation/PostgreSqlTableOperations.java       |  31 +-
 8 files changed, 768 insertions(+), 312 deletions(-)

diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
index e9b6bf6ab..e9cd14cf3 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.catalog.jdbc.operation;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -30,6 +31,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import javax.sql.DataSource;
@@ -46,6 +48,7 @@ import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.Expression;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
@@ -125,15 +128,20 @@ public abstract class JdbcTableOperations implements 
TableOperation {
     return true;
   }
 
+  /**
+   * The default implementation of this method is based on MySQL, and if the 
catalog does not
+   * compatible with MySQL, this method needs to be rewritten.
+   */
   @Override
   public List<String> listTables(String databaseName) throws 
NoSuchSchemaException {
-    try (Connection connection = getConnection(databaseName)) {
-      final List<String> names = Lists.newArrayList();
-      try (ResultSet tables = getTables(connection)) {
-        while (tables.next()) {
-          if (Objects.equals(tables.getString("TABLE_SCHEM"), databaseName)) {
-            names.add(tables.getString("TABLE_NAME"));
-          }
+
+    final List<String> names = Lists.newArrayList();
+
+    try (Connection connection = getConnection(databaseName);
+        ResultSet tables = getTables(connection)) {
+      while (tables.next()) {
+        if (Objects.equals(tables.getString("TABLE_CAT"), databaseName)) {
+          names.add(tables.getString("TABLE_NAME"));
         }
       }
       LOG.info("Finished listing tables size {} for database name {} ", 
names.size(), databaseName);
@@ -454,17 +462,35 @@ public abstract class JdbcTableOperations implements 
TableOperation {
       Distribution distribution,
       Index[] indexes);
 
-  protected abstract String generateRenameTableSql(String oldTableName, String 
newTableName);
+  /**
+   * The default implementation of this method is based on MySQL syntax, and 
if the catalog does not
+   * support MySQL syntax, this method needs to be rewritten.
+   */
+  protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
+    return String.format("RENAME TABLE `%s` TO `%s`", oldTableName, 
newTableName);
+  }
 
-  protected abstract String generateDropTableSql(String tableName);
+  /**
+   * The default implementation of this method is based on MySQL syntax, and 
if the catalog does not
+   * support MySQL syntax, this method needs to be rewritten.
+   */
+  protected String generateDropTableSql(String tableName) {
+    return String.format("DROP TABLE `%s`", tableName);
+  }
 
   protected abstract String generatePurgeTableSql(String tableName);
 
   protected abstract String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes);
 
-  protected abstract JdbcTable getOrCreateTable(
-      String databaseName, String tableName, JdbcTable lazyLoadCreateTable);
+  /**
+   * The default implementation of this method is based on MySQL syntax, and 
if the catalog does not
+   * support MySQL syntax, this method needs to be rewritten.
+   */
+  protected JdbcTable getOrCreateTable(
+      String databaseName, String tableName, JdbcTable lazyLoadCreateTable) {
+    return null != lazyLoadCreateTable ? lazyLoadCreateTable : 
load(databaseName, tableName);
+  }
 
   protected void validateUpdateColumnNullable(
       TableChange.UpdateColumnNullability change, JdbcTable table) {
@@ -479,6 +505,61 @@ public abstract class JdbcTableOperations implements 
TableOperation {
     }
   }
 
+  /**
+   * The auto-increment column will be verified. There can only be one 
auto-increment column and it
+   * must be the primary key or unique index.
+   *
+   * @param columns jdbc column
+   * @param indexes table indexes
+   */
+  protected static void validateIncrementCol(JdbcColumn[] columns, Index[] 
indexes) {
+    // Check auto increment column
+    List<JdbcColumn> autoIncrementCols =
+        
Arrays.stream(columns).filter(Column::autoIncrement).collect(Collectors.toList());
+    String autoIncrementColsStr =
+        
autoIncrementCols.stream().map(JdbcColumn::name).collect(Collectors.joining(",",
 "[", "]"));
+    Preconditions.checkArgument(
+        autoIncrementCols.size() <= 1,
+        "Only one column can be auto-incremented. There are multiple 
auto-increment columns in your table: "
+            + autoIncrementColsStr);
+    if (!autoIncrementCols.isEmpty()) {
+      Optional<Index> existAutoIncrementColIndexOptional =
+          Arrays.stream(indexes)
+              .filter(
+                  index ->
+                      Arrays.stream(index.fieldNames())
+                          .flatMap(Arrays::stream)
+                          .anyMatch(
+                              s ->
+                                  
StringUtils.equalsIgnoreCase(autoIncrementCols.get(0).name(), s)))
+              .filter(
+                  index ->
+                      index.type() == Index.IndexType.PRIMARY_KEY
+                          || index.type() == Index.IndexType.UNIQUE_KEY)
+              .findAny();
+      Preconditions.checkArgument(
+          existAutoIncrementColIndexOptional.isPresent(),
+          "Incorrect table definition; there can be only one auto column and 
it must be defined as a key");
+    }
+  }
+
+  /**
+   * The default implementation of this method is based on MySQL syntax, and 
if the catalog does not
+   * support MySQL syntax, this method needs to be rewritten.
+   */
+  protected static String getIndexFieldStr(String[][] fieldNames) {
+    return Arrays.stream(fieldNames)
+        .map(
+            colNames -> {
+              if (colNames.length > 1) {
+                throw new IllegalArgumentException(
+                    "Index does not support complex fields in this Catalog");
+              }
+              return String.format("`%s`", colNames[0]);
+            })
+        .collect(Collectors.joining(", "));
+  }
+
   protected JdbcColumn getJdbcColumnFromTable(JdbcTable jdbcTable, String 
colName) {
     return (JdbcColumn)
         Arrays.stream(jdbcTable.columns())
diff --git 
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
 
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
index ebd7027b1..aa6348e2f 100644
--- 
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
+++ 
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
@@ -26,7 +26,6 @@ import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -38,7 +37,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -53,7 +51,6 @@ import org.apache.gravitino.catalog.jdbc.JdbcTable;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
 import 
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
 import org.apache.gravitino.exceptions.NoSuchColumnException;
-import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
@@ -73,24 +70,6 @@ public class DorisTableOperations extends 
JdbcTableOperations {
   private static final String DORIS_AUTO_INCREMENT = "AUTO_INCREMENT";
   private static final String NEW_LINE = "\n";
 
-  @Override
-  public List<String> listTables(String databaseName) throws 
NoSuchSchemaException {
-    final List<String> names = Lists.newArrayList();
-
-    try (Connection connection = getConnection(databaseName);
-        ResultSet tables = getTables(connection)) {
-      while (tables.next()) {
-        if (Objects.equals(tables.getString("TABLE_CAT"), databaseName)) {
-          names.add(tables.getString("TABLE_NAME"));
-        }
-      }
-      LOG.info("Finished listing tables size {} for database name {} ", 
names.size(), databaseName);
-      return names;
-    } catch (final SQLException se) {
-      throw this.exceptionMapper.toGravitinoException(se);
-    }
-  }
-
   @Override
   public JdbcTablePartitionOperations 
createJdbcTablePartitionOperations(JdbcTable loadedTable) {
     return new DorisTablePartitionOperations(
@@ -497,11 +476,6 @@ public class DorisTableOperations extends 
JdbcTableOperations {
     return String.format("ALTER TABLE `%s` RENAME `%s`", oldTableName, 
newTableName);
   }
 
-  @Override
-  protected String generateDropTableSql(String tableName) {
-    return String.format("DROP TABLE `%s`", tableName);
-  }
-
   @Override
   protected String generatePurgeTableSql(String tableName) {
     throw new UnsupportedOperationException(
@@ -635,12 +609,6 @@ public class DorisTableOperations extends 
JdbcTableOperations {
         .collect(Collectors.joining(",\n"));
   }
 
-  @Override
-  protected JdbcTable getOrCreateTable(
-      String databaseName, String tableName, JdbcTable lazyLoadCreateTable) {
-    return null != lazyLoadCreateTable ? lazyLoadCreateTable : 
load(databaseName, tableName);
-  }
-
   private String updateColumnCommentFieldDefinition(
       TableChange.UpdateColumnComment updateColumnComment) {
     String newComment = updateColumnComment.getNewComment();
diff --git 
a/catalogs/catalog-jdbc-mysql/src/main/java/org/apache/gravitino/catalog/mysql/operation/MysqlTableOperations.java
 
b/catalogs/catalog-jdbc-mysql/src/main/java/org/apache/gravitino/catalog/mysql/operation/MysqlTableOperations.java
index 8aa1534c9..b8cc2f872 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/main/java/org/apache/gravitino/catalog/mysql/operation/MysqlTableOperations.java
+++ 
b/catalogs/catalog-jdbc-mysql/src/main/java/org/apache/gravitino/catalog/mysql/operation/MysqlTableOperations.java
@@ -24,7 +24,6 @@ import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -36,7 +35,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -48,7 +46,6 @@ import org.apache.gravitino.catalog.jdbc.JdbcColumn;
 import org.apache.gravitino.catalog.jdbc.JdbcTable;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
 import org.apache.gravitino.exceptions.NoSuchColumnException;
-import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
@@ -62,29 +59,11 @@ import org.apache.gravitino.rel.types.Types;
 /** Table operations for MySQL. */
 public class MysqlTableOperations extends JdbcTableOperations {
 
-  public static final String BACK_QUOTE = "`";
-  public static final String MYSQL_AUTO_INCREMENT = "AUTO_INCREMENT";
+  private static final String BACK_QUOTE = "`";
+  private static final String MYSQL_AUTO_INCREMENT = "AUTO_INCREMENT";
   private static final String MYSQL_NOT_SUPPORT_NESTED_COLUMN_MSG =
       "Mysql does not support nested column names.";
 
-  @Override
-  public List<String> listTables(String databaseName) throws 
NoSuchSchemaException {
-    final List<String> names = Lists.newArrayList();
-
-    try (Connection connection = getConnection(databaseName);
-        ResultSet tables = getTables(connection)) {
-      while (tables.next()) {
-        if (Objects.equals(tables.getString("TABLE_CAT"), databaseName)) {
-          names.add(tables.getString("TABLE_NAME"));
-        }
-      }
-      LOG.info("Finished listing tables size {} for database name {} ", 
names.size(), databaseName);
-      return names;
-    } catch (final SQLException se) {
-      throw this.exceptionMapper.toGravitinoException(se);
-    }
-  }
-
   @Override
   protected String generateCreateTableSql(
       String tableName,
@@ -151,44 +130,6 @@ public class MysqlTableOperations extends 
JdbcTableOperations {
     return result;
   }
 
-  /**
-   * The auto-increment column will be verified. There can only be one 
auto-increment column and it
-   * must be the primary key or unique index.
-   *
-   * @param columns jdbc column
-   * @param indexes table indexes
-   */
-  private static void validateIncrementCol(JdbcColumn[] columns, Index[] 
indexes) {
-    // Check auto increment column
-    List<JdbcColumn> autoIncrementCols =
-        
Arrays.stream(columns).filter(Column::autoIncrement).collect(Collectors.toList());
-    String autoIncrementColsStr =
-        
autoIncrementCols.stream().map(JdbcColumn::name).collect(Collectors.joining(",",
 "[", "]"));
-    Preconditions.checkArgument(
-        autoIncrementCols.size() <= 1,
-        "Only one column can be auto-incremented. There are multiple 
auto-increment columns in your table: "
-            + autoIncrementColsStr);
-    if (!autoIncrementCols.isEmpty()) {
-      Optional<Index> existAutoIncrementColIndexOptional =
-          Arrays.stream(indexes)
-              .filter(
-                  index ->
-                      Arrays.stream(index.fieldNames())
-                          .flatMap(Arrays::stream)
-                          .anyMatch(
-                              s ->
-                                  
StringUtils.equalsIgnoreCase(autoIncrementCols.get(0).name(), s)))
-              .filter(
-                  index ->
-                      index.type() == Index.IndexType.PRIMARY_KEY
-                          || index.type() == Index.IndexType.UNIQUE_KEY)
-              .findAny();
-      Preconditions.checkArgument(
-          existAutoIncrementColIndexOptional.isPresent(),
-          "Incorrect table definition; there can be only one auto column and 
it must be defined as a key");
-    }
-  }
-
   public static void appendIndexesSql(Index[] indexes, StringBuilder 
sqlBuilder) {
     for (Index index : indexes) {
       String fieldStr = getIndexFieldStr(index.fieldNames());
@@ -215,19 +156,6 @@ public class MysqlTableOperations extends 
JdbcTableOperations {
     }
   }
 
-  private static String getIndexFieldStr(String[][] fieldNames) {
-    return Arrays.stream(fieldNames)
-        .map(
-            colNames -> {
-              if (colNames.length > 1) {
-                throw new IllegalArgumentException(
-                    "Index does not support complex fields in MySQL");
-              }
-              return BACK_QUOTE + colNames[0] + BACK_QUOTE;
-            })
-        .collect(Collectors.joining(", "));
-  }
-
   @Override
   protected boolean getAutoIncrementInfo(ResultSet resultSet) throws 
SQLException {
     return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
@@ -276,16 +204,6 @@ public class MysqlTableOperations extends 
JdbcTableOperations {
     }
   }
 
-  @Override
-  protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
-    return String.format("RENAME TABLE `%s` TO `%s`", oldTableName, 
newTableName);
-  }
-
-  @Override
-  protected String generateDropTableSql(String tableName) {
-    return "DROP TABLE " + BACK_QUOTE + tableName + BACK_QUOTE;
-  }
-
   @Override
   protected String generatePurgeTableSql(String tableName) {
     throw new UnsupportedOperationException(
@@ -492,12 +410,6 @@ public class MysqlTableOperations extends 
JdbcTableOperations {
         .collect(Collectors.joining(",\n"));
   }
 
-  @Override
-  protected JdbcTable getOrCreateTable(
-      String databaseName, String tableName, JdbcTable lazyLoadCreateTable) {
-    return null != lazyLoadCreateTable ? lazyLoadCreateTable : 
load(databaseName, tableName);
-  }
-
   private String updateColumnCommentFieldDefinition(
       TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
     String newComment = updateColumnComment.getNewComment();
diff --git 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
index f6b91b00e..9d1a80b49 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
+++ 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
@@ -934,7 +934,7 @@ public class CatalogMysqlIT extends BaseIT {
     Assertions.assertTrue(
         StringUtils.contains(
             illegalArgumentException.getMessage(),
-            "Index does not support complex fields in MySQL"));
+            "Index does not support complex fields in this Catalog"));
 
     Index[] indexes3 = new Index[] {Indexes.unique("u1_key", new String[][] 
{{"col_2", "col_3"}})};
     illegalArgumentException =
@@ -954,7 +954,7 @@ public class CatalogMysqlIT extends BaseIT {
     Assertions.assertTrue(
         StringUtils.contains(
             illegalArgumentException.getMessage(),
-            "Index does not support complex fields in MySQL"));
+            "Index does not support complex fields in this Catalog"));
 
     NameIdentifier tableIdent = NameIdentifier.of(schemaName, "test_null_key");
     tableCatalog.createTable(
diff --git 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
index 93783da3b..ce1343dd5 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
+++ 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
@@ -51,8 +51,8 @@ import org.junit.jupiter.api.Test;
 
 @Tag("gravitino-docker-test")
 public class TestMysqlTableOperations extends TestMysql {
-  private static Type VARCHAR = Types.VarCharType.of(255);
-  private static Type INT = Types.IntegerType.get();
+  private static final Type VARCHAR = Types.VarCharType.of(255);
+  private static final Type INT = Types.IntegerType.get();
 
   @Test
   public void testOperationTable() {
diff --git 
a/catalogs/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java
 
b/catalogs/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java
index b1744a0c6..77c972909 100644
--- 
a/catalogs/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java
+++ 
b/catalogs/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java
@@ -18,35 +18,50 @@
  */
 package org.apache.gravitino.catalog.oceanbase.operation;
 
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
 import org.apache.gravitino.catalog.jdbc.JdbcTable;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
-import org.apache.gravitino.exceptions.GravitinoRuntimeException;
-import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.catalog.jdbc.utils.JdbcConnectorUtils;
+import org.apache.gravitino.exceptions.NoSuchColumnException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
 
 /** Table operations for OceanBase. */
 public class OceanBaseTableOperations extends JdbcTableOperations {
 
-  @Override
-  public List<String> listTables(String databaseName) throws 
NoSuchSchemaException {
-    throw new GravitinoRuntimeException("Not implemented yet.");
-  }
-
-  @Override
-  public JdbcTable load(String databaseName, String tableName) throws 
NoSuchTableException {
-    return super.load(databaseName, tableName.toLowerCase());
-  }
+  private static final String BACK_QUOTE = "`";
+  private static final String OCEANBASE_AUTO_INCREMENT = "AUTO_INCREMENT";
+  private static final String OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG =
+      "OceanBase does not support nested column names.";
 
   @Override
   protected String generateCreateTableSql(
@@ -58,10 +73,83 @@ public class OceanBaseTableOperations extends 
JdbcTableOperations {
       Distribution distribution,
       Index[] indexes) {
     if (ArrayUtils.isNotEmpty(partitioning)) {
-      throw new UnsupportedOperationException("Currently not support Partition 
tables.");
+      throw new UnsupportedOperationException(
+          "Currently we do not support Partitioning in oceanbase");
+    }
+
+    if (!Distributions.NONE.equals(distribution)) {
+      throw new UnsupportedOperationException("OceanBase does not support 
distribution");
+    }
+
+    validateIncrementCol(columns, indexes);
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(String.format("CREATE TABLE `%s` (\n", tableName));
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder
+          .append(SPACE)
+          .append(SPACE)
+          .append(BACK_QUOTE)
+          .append(column.name())
+          .append(BACK_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
     }
 
-    throw new UnsupportedOperationException("Not implemented yet.");
+    appendIndexesSql(indexes, sqlBuilder);
+
+    sqlBuilder.append("\n)");
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      sqlBuilder.append(" COMMENT='").append(comment).append("'");
+    }
+
+    // Add table properties
+    if (MapUtils.isNotEmpty(properties)) {
+      sqlBuilder.append(
+          properties.entrySet().stream()
+              .map(entry -> String.format("%s = %s", entry.getKey(), 
entry.getValue()))
+              .collect(Collectors.joining(",\n", "\n", "")));
+    }
+
+    // Return the generated SQL statement
+    String result = sqlBuilder.append(";").toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  public static void appendIndexesSql(Index[] indexes, StringBuilder 
sqlBuilder) {
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(",\n");
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          if (null != index.name()
+              && !StringUtils.equalsIgnoreCase(
+                  index.name(), Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME)) {
+            throw new IllegalArgumentException("Primary key name must be 
PRIMARY in OceanBase");
+          }
+          sqlBuilder.append("CONSTRAINT ").append("PRIMARY KEY 
(").append(fieldStr).append(")");
+          break;
+        case UNIQUE_KEY:
+          sqlBuilder.append("CONSTRAINT ");
+          if (null != index.name()) {
+            
sqlBuilder.append(BACK_QUOTE).append(index.name()).append(BACK_QUOTE);
+          }
+          sqlBuilder.append(" UNIQUE (").append(fieldStr).append(")");
+          break;
+        default:
+          throw new IllegalArgumentException("OceanBase doesn't support index 
: " + index.type());
+      }
+    }
   }
 
   @Override
@@ -72,17 +160,38 @@ public class OceanBaseTableOperations extends 
JdbcTableOperations {
   @Override
   protected Map<String, String> getTableProperties(Connection connection, 
String tableName)
       throws SQLException {
-    throw new UnsupportedOperationException("Not implemented yet.");
-  }
+    try (PreparedStatement statement = connection.prepareStatement("SHOW TABLE 
STATUS LIKE ?")) {
+      statement.setString(1, tableName);
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          String name = resultSet.getString("NAME");
+          if (Objects.equals(name, tableName)) {
+            return Collections.unmodifiableMap(
+                new HashMap<String, String>() {
+                  {
+                    put(COMMENT, resultSet.getString(COMMENT));
+                    String autoIncrement = 
resultSet.getString("AUTO_INCREMENT");
+                    if (StringUtils.isNotEmpty(autoIncrement)) {
+                      put("AUTO_INCREMENT", autoIncrement);
+                    }
+                  }
+                });
+          }
+        }
 
-  @Override
-  protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
-    return String.format("RENAME TABLE `%s` TO `%s`", oldTableName, 
newTableName);
+        throw new NoSuchTableException(
+            "Table %s does not exist in %s.", tableName, 
connection.getCatalog());
+      }
+    }
   }
 
-  @Override
-  protected String generateDropTableSql(String tableName) {
-    return String.format("DROP TABLE `%s`", tableName);
+  protected void correctJdbcTableFields(
+      Connection connection, String databaseName, String tableName, 
JdbcTable.Builder tableBuilder)
+      throws SQLException {
+    if (StringUtils.isEmpty(tableBuilder.comment())) {
+      tableBuilder.withComment(
+          tableBuilder.properties().getOrDefault(COMMENT, 
tableBuilder.comment()));
+    }
   }
 
   @Override
@@ -90,21 +199,458 @@ public class OceanBaseTableOperations extends 
JdbcTableOperations {
     return String.format("TRUNCATE TABLE `%s`", tableName);
   }
 
+  /**
+   * OceanBase does not support some multiple changes in one statement, So 
rewrite this method, one
+   * by one to apply TableChange to the table.
+   *
+   * @param databaseName The name of the database.
+   * @param tableName The name of the table.
+   * @param changes The changes to apply to the table.
+   */
   @Override
   public void alterTable(String databaseName, String tableName, TableChange... 
changes)
       throws NoSuchTableException {
-    throw new UnsupportedOperationException("Not implemented yet.");
+    LOG.info("Attempting to alter table {} from database {}", tableName, 
databaseName);
+    try (Connection connection = getConnection(databaseName)) {
+      for (TableChange change : changes) {
+        String sql = generateAlterTableSql(databaseName, tableName, change);
+        if (StringUtils.isEmpty(sql)) {
+          LOG.info("No changes to alter table {} from database {}", tableName, 
databaseName);
+          return;
+        }
+        JdbcConnectorUtils.executeUpdate(connection, sql);
+      }
+      LOG.info("Alter table {} from database {}", tableName, databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
   }
 
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException("Not implemented yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+      } else if (change instanceof TableChange.SetProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(((TableChange.SetProperty) change));
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // OceanBase does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+      } else if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+      } else if (change instanceof TableChange.RenameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) 
change;
+        alterSql.add(renameColumnFieldDefinition(renameColumn, lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnDefaultValue updateColumnDefaultValue =
+            (TableChange.UpdateColumnDefaultValue) change;
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnType updateColumnType = 
(TableChange.UpdateColumnType) change;
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnComment updateColumnComment =
+            (TableChange.UpdateColumnComment) change;
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnPosition updateColumnPosition =
+            (TableChange.UpdateColumnPosition) change;
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+      } else if (change instanceof TableChange.DeleteColumn) {
+        TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+      } else if (change instanceof TableChange.AddIndex) {
+        alterSql.add(addIndexDefinition((TableChange.AddIndex) change));
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add("COMMENT '" + newComment + "'");
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateTableProperties(setProperties));
+    }
+
+    if (CollectionUtils.isEmpty(alterSql)) {
+      return "";
+    }
+    // Return the generated SQL statement
+    String result = "ALTER TABLE `" + tableName + "`\n" + String.join(",\n", 
alterSql) + ";";
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
   }
 
-  @Override
-  protected JdbcTable getOrCreateTable(
-      String databaseName, String tableName, JdbcTable lazyLoadCreateTable) {
-    return null != lazyLoadCreateTable ? lazyLoadCreateTable : 
load(databaseName, tableName);
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+    return MODIFY_COLUMN
+        + BACK_QUOTE
+        + col
+        + BACK_QUOTE
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  static String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    if (deleteIndex.isIfExists()) {
+      if (Arrays.stream(lazyLoadTable.index())
+          .anyMatch(index -> index.name().equals(deleteIndex.getName()))) {
+        throw new IllegalArgumentException("Index does not exist");
+      }
+    }
+    return "DROP INDEX " + BACK_QUOTE + deleteIndex.getName() + BACK_QUOTE;
+  }
+
+  private String updateColumnNullabilityDefinition(
+      TableChange.UpdateColumnNullability change, JdbcTable table) {
+    validateUpdateColumnNullable(change, table);
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(change.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+    return MODIFY_COLUMN
+        + BACK_QUOTE
+        + col
+        + BACK_QUOTE
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  static String addIndexDefinition(TableChange.AddIndex addIndex) {
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("ADD ");
+    switch (addIndex.getType()) {
+      case PRIMARY_KEY:
+        if (null != addIndex.getName()
+            && !StringUtils.equalsIgnoreCase(
+                addIndex.getName(), Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME)) {
+          throw new IllegalArgumentException("Primary key name must be PRIMARY 
in OceanBase");
+        }
+        sqlBuilder.append("PRIMARY KEY ");
+        break;
+      case UNIQUE_KEY:
+        sqlBuilder
+            .append("UNIQUE INDEX ")
+            .append(BACK_QUOTE)
+            .append(addIndex.getName())
+            .append(BACK_QUOTE);
+        break;
+      default:
+        break;
+    }
+    sqlBuilder.append(" 
(").append(getIndexFieldStr(addIndex.getFieldNames())).append(")");
+    return sqlBuilder.toString();
+  }
+
+  private String generateTableProperties(List<TableChange.SetProperty> 
setProperties) {
+    return setProperties.stream()
+        .map(
+            setProperty ->
+                String.format("%s = %s", setProperty.getProperty(), 
setProperty.getValue()))
+        .collect(Collectors.joining(",\n"));
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = updateColumnComment.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+    return MODIFY_COLUMN
+        + BACK_QUOTE
+        + col
+        + BACK_QUOTE
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = addColumn.fieldName()[0];
+
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition
+        .append("ADD COLUMN ")
+        .append(BACK_QUOTE)
+        .append(col)
+        .append(BACK_QUOTE)
+        .append(SPACE)
+        .append(dataType)
+        .append(SPACE);
+
+    if (addColumn.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(addColumn.getDataType()),
+          "Auto increment is not allowed, type: " + addColumn.getDataType());
+      columnDefinition.append(OCEANBASE_AUTO_INCREMENT).append(SPACE);
+    }
+
+    if (!addColumn.isNullable()) {
+      columnDefinition.append("NOT NULL ");
+    }
+    // Append comment if available
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      columnDefinition.append("COMMENT 
'").append(addColumn.getComment()).append("' ");
+    }
+
+    // Append default value if available
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      columnDefinition
+          .append("DEFAULT ")
+          
.append(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue()))
+          .append(SPACE);
+    }
+
+    // Append position if available
+    if (addColumn.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (addColumn.getPosition() instanceof TableChange.After) {
+      TableChange.After afterPosition = (TableChange.After) 
addColumn.getPosition();
+      columnDefinition
+          .append(AFTER)
+          .append(BACK_QUOTE)
+          .append(afterPosition.getColumn())
+          .append(BACK_QUOTE);
+    } else if (addColumn.getPosition() instanceof TableChange.Default) {
+      // do nothing, follow the default behavior of oceanbase
+    } else {
+      throw new IllegalArgumentException("Invalid column position.");
+    }
+    return columnDefinition.toString();
+  }
+
+  private String renameColumnFieldDefinition(
+      TableChange.RenameColumn renameColumn, JdbcTable jdbcTable) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String oldColumnName = renameColumn.fieldName()[0];
+    String newColumnName = renameColumn.getNewName();
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, oldColumnName);
+    StringBuilder sqlBuilder =
+        new StringBuilder(
+            "CHANGE COLUMN "
+                + BACK_QUOTE
+                + oldColumnName
+                + BACK_QUOTE
+                + SPACE
+                + BACK_QUOTE
+                + newColumnName
+                + BACK_QUOTE);
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName(newColumnName)
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+    return appendColumnDefinition(newColumn, sqlBuilder).toString();
+  }
+
+  private String updateColumnPositionFieldDefinition(
+      TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable 
jdbcTable) {
+    if (updateColumnPosition.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = updateColumnPosition.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition.append(MODIFY_COLUMN).append(col);
+    appendColumnDefinition(column, columnDefinition);
+    if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (updateColumnPosition.getPosition() instanceof 
TableChange.After) {
+      TableChange.After afterPosition = (TableChange.After) 
updateColumnPosition.getPosition();
+      columnDefinition.append(AFTER).append(afterPosition.getColumn());
+    } else {
+      Arrays.stream(jdbcTable.columns())
+          .reduce((column1, column2) -> column2)
+          .map(Column::name)
+          .ifPresent(s -> columnDefinition.append(AFTER).append(s));
+    }
+    return columnDefinition.toString();
+  }
+
+  private String deleteColumnFieldDefinition(
+      TableChange.DeleteColumn deleteColumn, JdbcTable jdbcTable) {
+    if (deleteColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = deleteColumn.fieldName()[0];
+    boolean colExists = true;
+    try {
+      getJdbcColumnFromTable(jdbcTable, col);
+    } catch (NoSuchColumnException noSuchColumnException) {
+      colExists = false;
+    }
+    if (!colExists) {
+      if (BooleanUtils.isTrue(deleteColumn.getIfExists())) {
+        return "";
+      } else {
+        throw new IllegalArgumentException("Delete column does not exist: " + 
col);
+      }
+    }
+    return "DROP COLUMN " + BACK_QUOTE + col + BACK_QUOTE;
+  }
+
+  private String updateColumnDefaultValueFieldDefinition(
+      TableChange.UpdateColumnDefaultValue updateColumnDefaultValue, JdbcTable 
jdbcTable) {
+    if (updateColumnDefaultValue.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = updateColumnDefaultValue.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    StringBuilder sqlBuilder = new StringBuilder(MODIFY_COLUMN + col);
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withType(column.dataType())
+            .withNullable(column.nullable())
+            .withComment(column.comment())
+            .withDefaultValue(updateColumnDefaultValue.getNewDefaultValue())
+            .build();
+    return appendColumnDefinition(newColumn, sqlBuilder).toString();
+  }
+
+  private String updateColumnTypeFieldDefinition(
+      TableChange.UpdateColumnType updateColumnType, JdbcTable jdbcTable) {
+    if (updateColumnType.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = updateColumnType.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    StringBuilder sqlBuilder = new StringBuilder(MODIFY_COLUMN + col);
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withType(updateColumnType.getNewDataType())
+            .withComment(column.comment())
+            .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+            .withNullable(column.nullable())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+    return appendColumnDefinition(newColumn, sqlBuilder).toString();
+  }
+
+  private StringBuilder appendColumnDefinition(JdbcColumn column, 
StringBuilder sqlBuilder) {
+    // Add data type
+    
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+    // Add NOT NULL if the column is marked as such
+    if (column.nullable()) {
+      sqlBuilder.append("NULL ");
+    } else {
+      sqlBuilder.append("NOT NULL ");
+    }
+
+    // Add DEFAULT value if specified
+    if (!DEFAULT_VALUE_NOT_SET.equals(column.defaultValue())) {
+      sqlBuilder
+          .append("DEFAULT ")
+          
.append(columnDefaultValueConverter.fromGravitino(column.defaultValue()))
+          .append(SPACE);
+    }
+
+    // Add column auto_increment if specified
+    if (column.autoIncrement()) {
+      sqlBuilder.append(OCEANBASE_AUTO_INCREMENT).append(" ");
+    }
+
+    // Add column comment if specified
+    if (StringUtils.isNotEmpty(column.comment())) {
+      sqlBuilder.append("COMMENT '").append(column.comment()).append("' ");
+    }
+    return sqlBuilder;
   }
 }
diff --git 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
 
b/catalogs/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/operation/TestOceanBaseTableOperations.java
similarity index 83%
copy from 
catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
copy to 
catalogs/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/operation/TestOceanBaseTableOperations.java
index 93783da3b..6f2a422fb 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
+++ 
b/catalogs/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/operation/TestOceanBaseTableOperations.java
@@ -16,10 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.mysql.operation;
-
-import static 
org.apache.gravitino.catalog.mysql.MysqlTablePropertiesMetadata.MYSQL_AUTO_INCREMENT_OFFSET_KEY;
-import static 
org.apache.gravitino.catalog.mysql.MysqlTablePropertiesMetadata.MYSQL_ENGINE_KEY;
+package org.apache.gravitino.catalog.oceanbase.operation;
 
 import java.time.LocalDateTime;
 import java.util.ArrayList;
@@ -33,7 +30,6 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
 import org.apache.gravitino.catalog.jdbc.JdbcTable;
-import org.apache.gravitino.exceptions.GravitinoRuntimeException;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
@@ -46,17 +42,23 @@ import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.utils.RandomNameUtils;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
 @Tag("gravitino-docker-test")
-public class TestMysqlTableOperations extends TestMysql {
-  private static Type VARCHAR = Types.VarCharType.of(255);
-  private static Type INT = Types.IntegerType.get();
+public class TestOceanBaseTableOperations extends TestOceanBase {
+  private static final Type VARCHAR = Types.VarCharType.of(255);
+  private static final Type INT = Types.IntegerType.get();
+
+  @BeforeAll
+  public static void setUp() {
+    DATABASE_OPERATIONS.create(TEST_DB_NAME, null, new HashMap<>());
+  }
 
   @Test
   public void testOperationTable() {
-    String tableName = RandomStringUtils.randomAlphabetic(16) + "_op_table";
+    String tableName = RandomStringUtils.randomAlphabetic(16).toLowerCase() + 
"_op_table";
     String tableComment = "test_comment";
     List<JdbcColumn> columns = new ArrayList<>();
     columns.add(
@@ -88,12 +90,11 @@ public class TestMysqlTableOperations extends TestMysql {
             .withNullable(false)
             .build());
     Map<String, String> properties = new HashMap<>();
-    properties.put(MYSQL_AUTO_INCREMENT_OFFSET_KEY, "10");
 
     Index[] indexes = new Index[] {Indexes.unique("test", new String[][] 
{{"col_1"}, {"col_2"}})};
     // create table
     TABLE_OPERATIONS.create(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         columns.toArray(new JdbcColumn[0]),
         tableComment,
@@ -103,19 +104,18 @@ public class TestMysqlTableOperations extends TestMysql {
         indexes);
 
     // list table
-    List<String> tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    List<String> tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME);
     Assertions.assertTrue(tables.contains(tableName));
 
     // load table
-    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
     assertionsTableInfo(
         tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
 
     // rename table
     String newName = "new_table";
-    Assertions.assertDoesNotThrow(
-        () -> TABLE_OPERATIONS.rename(TEST_DB_NAME.toString(), tableName, 
newName));
-    Assertions.assertDoesNotThrow(() -> 
TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName));
+    Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.rename(TEST_DB_NAME, 
tableName, newName));
+    Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.load(TEST_DB_NAME, 
newName));
 
     // alter table
     JdbcColumn newColumn =
@@ -127,17 +127,15 @@ public class TestMysqlTableOperations extends TestMysql {
             .withDefaultValue(Literals.of("hello test", VARCHAR))
             .build();
     TABLE_OPERATIONS.alterTable(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         newName,
         TableChange.addColumn(
             new String[] {newColumn.name()},
             newColumn.dataType(),
             newColumn.comment(),
             TableChange.ColumnPosition.after("col_1"),
-            newColumn.defaultValue()),
-        TableChange.setProperty(MYSQL_ENGINE_KEY, "InnoDB"));
-    properties.put(MYSQL_ENGINE_KEY, "InnoDB");
-    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName);
+            newColumn.defaultValue()));
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME, newName);
     List<JdbcColumn> alterColumns =
         new ArrayList<JdbcColumn>() {
           {
@@ -151,22 +149,10 @@ public class TestMysqlTableOperations extends TestMysql {
     assertionsTableInfo(
         newName, tableComment, alterColumns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
 
-    // Detect unsupported properties
-    TableChange setProperty = TableChange.setProperty(MYSQL_ENGINE_KEY, "ABC");
-    GravitinoRuntimeException gravitinoRuntimeException =
-        Assertions.assertThrows(
-            GravitinoRuntimeException.class,
-            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME.toString(), 
newName, setProperty));
-    Assertions.assertTrue(
-        StringUtils.contains(
-            gravitinoRuntimeException.getMessage(), "Unknown storage engine 
'ABC'"));
-
     // delete column
     TABLE_OPERATIONS.alterTable(
-        TEST_DB_NAME.toString(),
-        newName,
-        TableChange.deleteColumn(new String[] {newColumn.name()}, true));
-    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName);
+        TEST_DB_NAME, newName, TableChange.deleteColumn(new String[] 
{newColumn.name()}, true));
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME, newName);
     assertionsTableInfo(
         newName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
 
@@ -174,29 +160,26 @@ public class TestMysqlTableOperations extends TestMysql {
     IllegalArgumentException illegalArgumentException =
         Assertions.assertThrows(
             IllegalArgumentException.class,
-            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME.toString(), 
newName, deleteColumn));
+            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME, newName, 
deleteColumn));
     Assertions.assertEquals(
         "Delete column does not exist: " + newColumn.name(), 
illegalArgumentException.getMessage());
     Assertions.assertDoesNotThrow(
         () ->
             TABLE_OPERATIONS.alterTable(
-                TEST_DB_NAME.toString(),
+                TEST_DB_NAME,
                 newName,
                 TableChange.deleteColumn(new String[] {newColumn.name()}, 
true)));
 
     TABLE_OPERATIONS.alterTable(
-        TEST_DB_NAME.toString(),
-        newName,
-        TableChange.deleteColumn(new String[] {newColumn.name()}, true));
-    Assertions.assertTrue(
-        TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName), "table should 
be dropped");
+        TEST_DB_NAME, newName, TableChange.deleteColumn(new String[] 
{newColumn.name()}, true));
+    Assertions.assertTrue(TABLE_OPERATIONS.drop(TEST_DB_NAME, newName), "table 
should be dropped");
     Assertions.assertFalse(
-        TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName), "table should 
be non-existent");
+        TABLE_OPERATIONS.drop(TEST_DB_NAME, newName), "table should be 
non-existent");
   }
 
   @Test
   public void testAlterTable() {
-    String tableName = RandomStringUtils.randomAlphabetic(16) + "_al_table";
+    String tableName = RandomStringUtils.randomAlphabetic(16).toLowerCase() + 
"_al_table";
     String tableComment = "test_comment";
     List<JdbcColumn> columns = new ArrayList<>();
     JdbcColumn col_1 =
@@ -236,7 +219,7 @@ public class TestMysqlTableOperations extends TestMysql {
         };
     // create table
     TABLE_OPERATIONS.create(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         columns.toArray(new JdbcColumn[0]),
         tableComment,
@@ -244,16 +227,16 @@ public class TestMysqlTableOperations extends TestMysql {
         null,
         Distributions.NONE,
         indexes);
-    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
     assertionsTableInfo(
         tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
 
     TABLE_OPERATIONS.alterTable(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         TableChange.updateColumnType(new String[] {col_1.name()}, VARCHAR));
 
-    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
 
     // After modifying the type, some attributes of the corresponding column 
are not
     // supported.
@@ -278,11 +261,11 @@ public class TestMysqlTableOperations extends TestMysql {
     // `col_2` varchar(255) NOT NULL DEFAULT 'hello world' COMMENT 
'new_comment' ,
     // `col_3` varchar(255) NULL DEFAULT NULL COMMENT 'name' ,
     TABLE_OPERATIONS.alterTable(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         TableChange.updateColumnType(new String[] {col_1.name()}, INT),
         TableChange.updateColumnComment(new String[] {col_2.name()}, 
newComment));
-    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
 
     columns.clear();
     col_1 =
@@ -318,12 +301,12 @@ public class TestMysqlTableOperations extends TestMysql {
     // ,
     // `col_3` varchar(255) NULL DEFAULT NULL COMMENT 'name' ,
     TABLE_OPERATIONS.alterTable(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         TableChange.renameColumn(new String[] {col_1.name()}, newColName_1),
         TableChange.renameColumn(new String[] {col_2.name()}, newColName_2));
 
-    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
 
     columns.clear();
     col_1 =
@@ -352,23 +335,23 @@ public class TestMysqlTableOperations extends TestMysql {
 
     newComment = "txt3";
     String newCol2Comment = "xxx";
-    // update column position 、comment and add column、set table properties
+    // update column position add column、set table properties
     // `new_col_2` varchar(255) NOT NULL DEFAULT 'hello world' COMMENT 'xxx' ,
     // `new_col_1` int NOT NULL COMMENT 'id' ,
     // `col_3` varchar(255) NULL DEFAULT NULL COMMENT 'name' ,
     // `col_4` varchar(255) NOT NULL COMMENT 'txt4' ,
     // `col_5` varchar(255) COMMENT 'hello world' DEFAULT 'hello world' ,
     TABLE_OPERATIONS.alterTable(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         TableChange.updateColumnPosition(
             new String[] {newColName_1}, 
TableChange.ColumnPosition.after(newColName_2)),
-        TableChange.updateComment(newComment),
         TableChange.addColumn(new String[] {"col_4"}, VARCHAR, "txt4", false),
         TableChange.updateColumnComment(new String[] {newColName_2}, 
newCol2Comment),
         TableChange.addColumn(
             new String[] {"col_5"}, VARCHAR, "txt5", Literals.of("hello 
world", VARCHAR)));
-    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    TABLE_OPERATIONS.alterTable(TEST_DB_NAME, tableName, 
TableChange.updateComment(newComment));
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
 
     columns.clear();
 
@@ -408,13 +391,13 @@ public class TestMysqlTableOperations extends TestMysql {
     // `col_5` varchar(255) COMMENT 'hello world' DEFAULT 'hello world' ,
     // `new_col_1` int NOT NULL COMMENT 'id' ,
     TABLE_OPERATIONS.alterTable(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         TableChange.updateColumnPosition(new String[] {columns.get(1).name()}, 
null),
         TableChange.updateColumnNullability(
             new String[] {columns.get(3).name()}, !columns.get(3).nullable()));
 
-    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
     col_1 = columns.remove(1);
     JdbcColumn col3 = columns.remove(1);
     JdbcColumn col_4 = columns.remove(1);
@@ -449,7 +432,7 @@ public class TestMysqlTableOperations extends TestMysql {
     IllegalArgumentException exception =
         Assertions.assertThrows(
             IllegalArgumentException.class,
-            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME.toString(), 
tableName, updateColumn));
+            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME, tableName, 
updateColumn));
     Assertions.assertTrue(
         exception.getMessage().contains("with null default value cannot be 
changed to not null"));
   }
@@ -479,7 +462,6 @@ public class TestMysqlTableOperations extends TestMysql {
         JdbcColumn.builder()
             .withName("col_3")
             .withType(Types.TimestampType.withoutTimeZone())
-            // MySQL 5.7 doesn't support nullable timestamp
             .withNullable(false)
             .withComment("timestamp")
             
.withDefaultValue(Literals.timestampLiteral(LocalDateTime.parse("2013-01-01T00:00:00")))
@@ -501,7 +483,7 @@ public class TestMysqlTableOperations extends TestMysql {
         };
     // create table
     TABLE_OPERATIONS.create(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         columns.toArray(new JdbcColumn[0]),
         tableComment,
@@ -510,12 +492,12 @@ public class TestMysqlTableOperations extends TestMysql {
         Distributions.NONE,
         indexes);
 
-    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
     assertionsTableInfo(
         tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, loaded);
 
     TABLE_OPERATIONS.alterTable(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         TableChange.updateColumnDefaultValue(
             new String[] {columns.get(0).name()},
@@ -528,7 +510,7 @@ public class TestMysqlTableOperations extends TestMysql {
         TableChange.updateColumnDefaultValue(
             new String[] {columns.get(3).name()}, Literals.of("world", 
Types.VarCharType.of(255))));
 
-    loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    loaded = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
     Assertions.assertEquals(
         Literals.decimalLiteral(Decimal.of("1.234", 10, 2)), 
loaded.columns()[0].defaultValue());
     Assertions.assertEquals(Literals.longLiteral(1L), 
loaded.columns()[1].defaultValue());
@@ -541,7 +523,7 @@ public class TestMysqlTableOperations extends TestMysql {
 
   @Test
   public void testCreateAndLoadTable() {
-    String tableName = RandomStringUtils.randomAlphabetic(16) + "_cl_table";
+    String tableName = RandomStringUtils.randomAlphabetic(16).toLowerCase() + 
"_cl_table";
     String tableComment = "test_comment";
     List<JdbcColumn> columns = new ArrayList<>();
     columns.add(
@@ -564,7 +546,6 @@ public class TestMysqlTableOperations extends TestMysql {
         JdbcColumn.builder()
             .withName("col_3")
             .withType(Types.TimestampType.withoutTimeZone())
-            // MySQL 5.7 doesn't support nullable timestamp
             .withNullable(false)
             .withComment("timestamp")
             
.withDefaultValue(Literals.timestampLiteral(LocalDateTime.parse("2013-01-01T00:00:00")))
@@ -586,7 +567,7 @@ public class TestMysqlTableOperations extends TestMysql {
         };
     // create table
     TABLE_OPERATIONS.create(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         columns.toArray(new JdbcColumn[0]),
         tableComment,
@@ -595,7 +576,7 @@ public class TestMysqlTableOperations extends TestMysql {
         Distributions.NONE,
         indexes);
 
-    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
     assertionsTableInfo(
         tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, loaded);
   }
@@ -685,7 +666,7 @@ public class TestMysqlTableOperations extends TestMysql {
 
     // create table
     TABLE_OPERATIONS.create(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         tableName,
         columns.toArray(new JdbcColumn[0]),
         tableComment,
@@ -694,7 +675,7 @@ public class TestMysqlTableOperations extends TestMysql {
         Distributions.NONE,
         Indexes.EMPTY_INDEXES);
 
-    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
     assertionsTableInfo(
         tableName,
         tableComment,
@@ -712,7 +693,6 @@ public class TestMysqlTableOperations extends TestMysql {
     List<JdbcColumn> columns = new ArrayList<>();
     List<Type> notSupportType =
         Arrays.asList(
-            Types.BooleanType.get(),
             Types.FixedType.of(10),
             Types.IntervalDayType.get(),
             Types.IntervalYearType.get(),
@@ -735,7 +715,7 @@ public class TestMysqlTableOperations extends TestMysql {
               IllegalArgumentException.class,
               () -> {
                 TABLE_OPERATIONS.create(
-                    TEST_DB_NAME.toString(),
+                    TEST_DB_NAME,
                     tableName,
                     jdbcCols,
                     tableComment,
@@ -744,12 +724,14 @@ public class TestMysqlTableOperations extends TestMysql {
                     Distributions.NONE,
                     Indexes.EMPTY_INDEXES);
               });
+      System.out.println(illegalArgumentException.getMessage());
       Assertions.assertTrue(
           illegalArgumentException
               .getMessage()
               .contains(
                   String.format(
-                      "Couldn't convert Gravitino type %s to MySQL type", 
type.simpleString())));
+                      "Couldn't convert Gravitino type %s to OceanBase type",
+                      type.simpleString())));
     }
   }
 
@@ -757,7 +739,7 @@ public class TestMysqlTableOperations extends TestMysql {
   public void testCreateMultipleTables() {
     String test_table_1 = "test_table_1";
     TABLE_OPERATIONS.create(
-        TEST_DB_NAME.toString(),
+        TEST_DB_NAME,
         test_table_1,
         new JdbcColumn[] {
           JdbcColumn.builder()
@@ -799,33 +781,10 @@ public class TestMysqlTableOperations extends TestMysql {
         Distributions.NONE,
         Indexes.EMPTY_INDEXES);
 
-    tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME);
     Assertions.assertFalse(tables.contains(test_table_2));
   }
 
-  @Test
-  public void testLoadTableDefaultProperties() {
-    String test_table_1 = RandomNameUtils.genRandomName("properties_table_");
-    TABLE_OPERATIONS.create(
-        TEST_DB_NAME.toString(),
-        test_table_1,
-        new JdbcColumn[] {
-          JdbcColumn.builder()
-              .withName("col_1")
-              .withType(Types.DecimalType.of(10, 2))
-              .withComment("test_decimal")
-              .withNullable(false)
-              .build()
-        },
-        "test_comment",
-        null,
-        null,
-        Distributions.NONE,
-        Indexes.EMPTY_INDEXES);
-    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
test_table_1);
-    Assertions.assertEquals("InnoDB", load.properties().get(MYSQL_ENGINE_KEY));
-  }
-
   @Test
   public void testAutoIncrement() {
     String tableName = "test_increment_table_1";
@@ -833,7 +792,7 @@ public class TestMysqlTableOperations extends TestMysql {
     Map<String, String> properties =
         new HashMap<String, String>() {
           {
-            put(MYSQL_AUTO_INCREMENT_OFFSET_KEY, "10");
+            put("AUTO_INCREMENT", "10");
           }
         };
     JdbcColumn[] columns = {
@@ -864,16 +823,9 @@ public class TestMysqlTableOperations extends TestMysql {
           Indexes.unique("uk_1", new String[][] {{"col_1"}})
         };
     TABLE_OPERATIONS.create(
-        TEST_DB_NAME.toString(),
-        tableName,
-        columns,
-        comment,
-        properties,
-        null,
-        Distributions.NONE,
-        indexes);
+        TEST_DB_NAME, tableName, columns, comment, properties, null, 
Distributions.NONE, indexes);
 
-    JdbcTable table = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    JdbcTable table = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
     assertionsTableInfo(
         tableName,
         comment,
@@ -882,7 +834,7 @@ public class TestMysqlTableOperations extends TestMysql {
         indexes,
         Transforms.EMPTY_TRANSFORM,
         table);
-    TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), tableName);
+    TABLE_OPERATIONS.drop(TEST_DB_NAME, tableName);
 
     // Test create increment key for primary index.
     indexes =
@@ -891,16 +843,9 @@ public class TestMysqlTableOperations extends TestMysql {
           Indexes.unique("uk_2", new String[][] {{"col_2"}})
         };
     TABLE_OPERATIONS.create(
-        TEST_DB_NAME.toString(),
-        tableName,
-        columns,
-        comment,
-        properties,
-        null,
-        Distributions.NONE,
-        indexes);
+        TEST_DB_NAME, tableName, columns, comment, properties, null, 
Distributions.NONE, indexes);
 
-    table = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    table = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
     assertionsTableInfo(
         tableName,
         comment,
@@ -909,21 +854,14 @@ public class TestMysqlTableOperations extends TestMysql {
         indexes,
         Transforms.EMPTY_TRANSFORM,
         table);
-    TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), tableName);
+    TABLE_OPERATIONS.drop(TEST_DB_NAME, tableName);
 
     // Test create increment key for col_1 + col_3 uk.
     indexes = new Index[] {Indexes.unique("uk_2_3", new String[][] {{"col_1"}, 
{"col_3"}})};
     TABLE_OPERATIONS.create(
-        TEST_DB_NAME.toString(),
-        tableName,
-        columns,
-        comment,
-        properties,
-        null,
-        Distributions.NONE,
-        indexes);
+        TEST_DB_NAME, tableName, columns, comment, properties, null, 
Distributions.NONE, indexes);
 
-    table = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    table = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName);
     assertionsTableInfo(
         tableName,
         comment,
@@ -932,7 +870,7 @@ public class TestMysqlTableOperations extends TestMysql {
         indexes,
         Transforms.EMPTY_TRANSFORM,
         table);
-    TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), tableName);
+    TABLE_OPERATIONS.drop(TEST_DB_NAME, tableName);
 
     // Test create auto increment fail
     IllegalArgumentException exception =
@@ -940,7 +878,7 @@ public class TestMysqlTableOperations extends TestMysql {
             IllegalArgumentException.class,
             () ->
                 TABLE_OPERATIONS.create(
-                    TEST_DB_NAME.toString(),
+                    TEST_DB_NAME,
                     tableName,
                     columns,
                     comment,
@@ -974,7 +912,7 @@ public class TestMysqlTableOperations extends TestMysql {
             IllegalArgumentException.class,
             () ->
                 TABLE_OPERATIONS.create(
-                    TEST_DB_NAME.toString(),
+                    TEST_DB_NAME,
                     tableName,
                     newColumns,
                     comment,
@@ -998,7 +936,7 @@ public class TestMysqlTableOperations extends TestMysql {
           Indexes.unique("uk_col_6", new String[][] {{"col_4"}, {"col_5"}, 
{"col_6"}})
         };
     StringBuilder sql = new StringBuilder();
-    MysqlTableOperations.appendIndexesSql(indexes, sql);
+    OceanBaseTableOperations.appendIndexesSql(indexes, sql);
     String expectedStr =
         ",\n"
             + "CONSTRAINT PRIMARY KEY (`col_2`, `col_1`),\n"
@@ -1015,7 +953,7 @@ public class TestMysqlTableOperations extends TestMysql {
           Indexes.unique("uk_3", new String[][] {{"col_4"}, {"col_5"}, 
{"col_6"}, {"col_7"}})
         };
     sql = new StringBuilder();
-    MysqlTableOperations.appendIndexesSql(indexes, sql);
+    OceanBaseTableOperations.appendIndexesSql(indexes, sql);
     expectedStr =
         ",\n"
             + "CONSTRAINT `uk_1` UNIQUE (`col_4`),\n"
@@ -1032,16 +970,16 @@ public class TestMysqlTableOperations extends TestMysql {
     IllegalArgumentException illegalArgumentException =
         Assertions.assertThrows(
             IllegalArgumentException.class,
-            () -> MysqlTableOperations.addIndexDefinition(failIndex));
+            () -> OceanBaseTableOperations.addIndexDefinition(failIndex));
     Assertions.assertTrue(
         illegalArgumentException
             .getMessage()
-            .contains("Primary key name must be PRIMARY in MySQL"));
+            .contains("Primary key name must be PRIMARY in OceanBase"));
 
     TableChange.AddIndex successIndex =
         new TableChange.AddIndex(
             Index.IndexType.UNIQUE_KEY, "uk_1", new String[][] {{"col_1"}, 
{"col_2"}});
-    String sql = MysqlTableOperations.addIndexDefinition(successIndex);
+    String sql = OceanBaseTableOperations.addIndexDefinition(successIndex);
     Assertions.assertEquals("ADD UNIQUE INDEX `uk_1` (`col_1`, `col_2`)", sql);
 
     successIndex =
@@ -1049,11 +987,11 @@ public class TestMysqlTableOperations extends TestMysql {
             Index.IndexType.PRIMARY_KEY,
             Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
             new String[][] {{"col_1"}, {"col_2"}});
-    sql = MysqlTableOperations.addIndexDefinition(successIndex);
+    sql = OceanBaseTableOperations.addIndexDefinition(successIndex);
     Assertions.assertEquals("ADD PRIMARY KEY  (`col_1`, `col_2`)", sql);
 
     TableChange.DeleteIndex deleteIndex = new TableChange.DeleteIndex("uk_1", 
false);
-    sql = MysqlTableOperations.deleteIndexDefinition(null, deleteIndex);
+    sql = OceanBaseTableOperations.deleteIndexDefinition(null, deleteIndex);
     Assertions.assertEquals("DROP INDEX `uk_1`", sql);
   }
 }
diff --git 
a/catalogs/catalog-jdbc-postgresql/src/main/java/org/apache/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java
 
b/catalogs/catalog-jdbc-postgresql/src/main/java/org/apache/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java
index 775687abd..13a0ff3be 100644
--- 
a/catalogs/catalog-jdbc-postgresql/src/main/java/org/apache/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java
+++ 
b/catalogs/catalog-jdbc-postgresql/src/main/java/org/apache/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java
@@ -22,6 +22,7 @@ import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
@@ -46,6 +47,7 @@ import 
org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
 import org.apache.gravitino.exceptions.NoSuchColumnException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
@@ -86,6 +88,24 @@ public class PostgreSqlTableOperations extends 
JdbcTableOperations {
         "The `jdbc-database` configuration item is mandatory in PostgreSQL.");
   }
 
+  @Override
+  public List<String> listTables(String databaseName) throws 
NoSuchSchemaException {
+    try (Connection connection = getConnection(databaseName)) {
+      final List<String> names = Lists.newArrayList();
+      try (ResultSet tables = getTables(connection)) {
+        while (tables.next()) {
+          if (Objects.equals(tables.getString("TABLE_SCHEM"), databaseName)) {
+            names.add(tables.getString("TABLE_NAME"));
+          }
+        }
+      }
+      LOG.info("Finished listing tables size {} for database name {} ", 
names.size(), databaseName);
+      return names;
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
   @Override
   protected JdbcTable.Builder getTableBuilder(
       ResultSet tablesResult, String databaseName, String tableName) throws 
SQLException {
@@ -226,7 +246,7 @@ public class PostgreSqlTableOperations extends 
JdbcTableOperations {
     }
   }
 
-  private static String getIndexFieldStr(String[][] fieldNames) {
+  protected static String getIndexFieldStr(String[][] fieldNames) {
     return Arrays.stream(fieldNames)
         .map(
             colNames -> {
@@ -593,15 +613,6 @@ public class PostgreSqlTableOperations extends 
JdbcTableOperations {
         + ";";
   }
 
-  @Override
-  public JdbcTable getOrCreateTable(
-      String databaseName, String tableName, JdbcTable lazyLoadTable) {
-    if (null == lazyLoadTable) {
-      return load(databaseName, tableName);
-    }
-    return lazyLoadTable;
-  }
-
   private List<String> addColumnFieldDefinition(
       TableChange.AddColumn addColumn, JdbcTable lazyLoadTable) {
     if (addColumn.fieldName().length > 1) {

Reply via email to