mchades commented on code in PR #5228:
URL: https://github.com/apache/gravitino/pull/5228#discussion_r1815955854


##########
catalogs/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java:
##########
@@ -72,39 +160,497 @@ protected boolean getAutoIncrementInfo(ResultSet 
resultSet) throws SQLException
   @Override
   protected Map<String, String> getTableProperties(Connection connection, 
String tableName)
       throws SQLException {
-    throw new UnsupportedOperationException("Not implemented yet.");
-  }
+    try (PreparedStatement statement = connection.prepareStatement("SHOW TABLE 
STATUS LIKE ?")) {
+      statement.setString(1, tableName);
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          String name = resultSet.getString("NAME");
+          if (Objects.equals(name, tableName)) {
+            return Collections.unmodifiableMap(
+                new HashMap<String, String>() {
+                  {
+                    put(COMMENT, resultSet.getString(COMMENT));
+                    String autoIncrement = 
resultSet.getString("AUTO_INCREMENT");
+                    if (StringUtils.isNotEmpty(autoIncrement)) {
+                      put("AUTO_INCREMENT", autoIncrement);
+                    }
+                  }
+                });
+          }
+        }
 
-  @Override
-  protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
-    return String.format("RENAME TABLE `%s` TO `%s`", oldTableName, 
newTableName);
+        throw new NoSuchTableException(
+            "Table %s does not exist in %s.", tableName, 
connection.getCatalog());
+      }
+    }
   }
 
-  @Override
-  protected String generateDropTableSql(String tableName) {
-    return String.format("DROP TABLE `%s`", tableName);
+  protected void correctJdbcTableFields(
+      Connection connection, String databaseName, String tableName, 
JdbcTable.Builder tableBuilder)
+      throws SQLException {
+    if (StringUtils.isEmpty(tableBuilder.comment())) {
+      tableBuilder.withComment(
+          tableBuilder.properties().getOrDefault(COMMENT, 
tableBuilder.comment()));
+    }
   }
 
   @Override
   protected String generatePurgeTableSql(String tableName) {
     return String.format("TRUNCATE TABLE `%s`", tableName);
   }
 
+  /**
+   * OceanBase does not support some multiple changes in one statement, So 
rewrite this method, one
+   * by one to apply TableChange to the table.
+   *
+   * @param databaseName The name of the database.
+   * @param tableName The name of the table.
+   * @param changes The changes to apply to the table.
+   */
   @Override
   public void alterTable(String databaseName, String tableName, TableChange... 
changes)
       throws NoSuchTableException {
-    throw new UnsupportedOperationException("Not implemented yet.");
+    LOG.info("Attempting to alter table {} from database {}", tableName, 
databaseName);
+    try (Connection connection = getConnection(databaseName)) {
+      for (TableChange change : changes) {
+        String sql = generateAlterTableSql(databaseName, tableName, change);
+        if (StringUtils.isEmpty(sql)) {
+          LOG.info("No changes to alter table {} from database {}", tableName, 
databaseName);
+          return;
+        }
+        JdbcConnectorUtils.executeUpdate(connection, sql);
+      }
+      LOG.info("Alter table {} from database {}", tableName, databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
   }
 
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException("Not implemented yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+      } else if (change instanceof TableChange.SetProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(((TableChange.SetProperty) change));
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // OceanBase does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+      } else if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+      } else if (change instanceof TableChange.RenameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) 
change;
+        alterSql.add(renameColumnFieldDefinition(renameColumn, lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnDefaultValue updateColumnDefaultValue =
+            (TableChange.UpdateColumnDefaultValue) change;
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnType updateColumnType = 
(TableChange.UpdateColumnType) change;
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnComment updateColumnComment =
+            (TableChange.UpdateColumnComment) change;
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnPosition updateColumnPosition =
+            (TableChange.UpdateColumnPosition) change;
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+      } else if (change instanceof TableChange.DeleteColumn) {
+        TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+      } else if (change instanceof TableChange.AddIndex) {
+        alterSql.add(addIndexDefinition((TableChange.AddIndex) change));
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add("COMMENT '" + newComment + "'");
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateTableProperties(setProperties));
+    }
+
+    if (CollectionUtils.isEmpty(alterSql)) {
+      return "";
+    }
+    // Return the generated SQL statement
+    String result = "ALTER TABLE `" + tableName + "`\n" + String.join(",\n", 
alterSql) + ";";
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
   }
 
-  @Override
-  protected JdbcTable getOrCreateTable(
-      String databaseName, String tableName, JdbcTable lazyLoadCreateTable) {
-    return null != lazyLoadCreateTable ? lazyLoadCreateTable : 
load(databaseName, tableName);
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+    return MODIFY_COLUMN
+        + BACK_QUOTE
+        + col
+        + BACK_QUOTE
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  static String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    if (deleteIndex.isIfExists()) {
+      if (Arrays.stream(lazyLoadTable.index())
+          .anyMatch(index -> index.name().equals(deleteIndex.getName()))) {
+        throw new IllegalArgumentException("Index does not exist");
+      }
+    }
+    return "DROP INDEX " + BACK_QUOTE + deleteIndex.getName() + BACK_QUOTE;
+  }
+
+  private String updateColumnNullabilityDefinition(
+      TableChange.UpdateColumnNullability change, JdbcTable table) {
+    validateUpdateColumnNullable(change, table);
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(change.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+    return MODIFY_COLUMN
+        + BACK_QUOTE
+        + col
+        + BACK_QUOTE
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  static String addIndexDefinition(TableChange.AddIndex addIndex) {
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("ADD ");
+    switch (addIndex.getType()) {
+      case PRIMARY_KEY:
+        if (null != addIndex.getName()
+            && !StringUtils.equalsIgnoreCase(
+                addIndex.getName(), Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME)) {
+          throw new IllegalArgumentException("Primary key name must be PRIMARY 
in OceanBase");
+        }
+        sqlBuilder.append("PRIMARY KEY ");
+        break;
+      case UNIQUE_KEY:
+        sqlBuilder
+            .append("UNIQUE INDEX ")
+            .append(BACK_QUOTE)
+            .append(addIndex.getName())
+            .append(BACK_QUOTE);
+        break;
+      default:
+        break;
+    }
+    sqlBuilder.append(" 
(").append(getIndexFieldStr(addIndex.getFieldNames())).append(")");
+    return sqlBuilder.toString();
+  }
+
+  private String generateTableProperties(List<TableChange.SetProperty> 
setProperties) {
+    return setProperties.stream()
+        .map(
+            setProperty ->
+                String.format("%s = %s", setProperty.getProperty(), 
setProperty.getValue()))
+        .collect(Collectors.joining(",\n"));
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = updateColumnComment.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+    return MODIFY_COLUMN
+        + BACK_QUOTE
+        + col
+        + BACK_QUOTE
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = addColumn.fieldName()[0];
+
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition
+        .append("ADD COLUMN ")
+        .append(BACK_QUOTE)
+        .append(col)
+        .append(BACK_QUOTE)
+        .append(SPACE)
+        .append(dataType)
+        .append(SPACE);
+
+    if (addColumn.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(addColumn.getDataType()),
+          "Auto increment is not allowed, type: " + addColumn.getDataType());
+      columnDefinition.append(OCEANBASE_AUTO_INCREMENT).append(SPACE);
+    }
+
+    if (!addColumn.isNullable()) {
+      columnDefinition.append("NOT NULL ");
+    }
+    // Append comment if available
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      columnDefinition.append("COMMENT 
'").append(addColumn.getComment()).append("' ");
+    }
+
+    // Append default value if available
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      columnDefinition
+          .append("DEFAULT ")
+          
.append(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue()))
+          .append(SPACE);
+    }
+
+    // Append position if available
+    if (addColumn.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (addColumn.getPosition() instanceof TableChange.After) {
+      TableChange.After afterPosition = (TableChange.After) 
addColumn.getPosition();
+      columnDefinition
+          .append(AFTER)
+          .append(BACK_QUOTE)
+          .append(afterPosition.getColumn())
+          .append(BACK_QUOTE);
+    } else if (addColumn.getPosition() instanceof TableChange.Default) {
+      // do nothing, follow the default behavior of oceanbase
+    } else {
+      throw new IllegalArgumentException("Invalid column position.");
+    }
+    return columnDefinition.toString();
+  }
+
+  private String renameColumnFieldDefinition(
+      TableChange.RenameColumn renameColumn, JdbcTable jdbcTable) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String oldColumnName = renameColumn.fieldName()[0];
+    String newColumnName = renameColumn.getNewName();
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, oldColumnName);
+    StringBuilder sqlBuilder =
+        new StringBuilder(
+            "CHANGE COLUMN "
+                + BACK_QUOTE
+                + oldColumnName
+                + BACK_QUOTE
+                + SPACE
+                + BACK_QUOTE
+                + newColumnName
+                + BACK_QUOTE);
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName(newColumnName)
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+    return appendColumnDefinition(newColumn, sqlBuilder).toString();
+  }
+
+  private String updateColumnPositionFieldDefinition(
+      TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable 
jdbcTable) {
+    if (updateColumnPosition.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = updateColumnPosition.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition.append(MODIFY_COLUMN).append(col);
+    appendColumnDefinition(column, columnDefinition);
+    if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (updateColumnPosition.getPosition() instanceof 
TableChange.After) {
+      TableChange.After afterPosition = (TableChange.After) 
updateColumnPosition.getPosition();
+      columnDefinition.append(AFTER).append(afterPosition.getColumn());
+    } else {
+      Arrays.stream(jdbcTable.columns())
+          .reduce((column1, column2) -> column2)
+          .map(Column::name)
+          .ifPresent(s -> columnDefinition.append(AFTER).append(s));
+    }
+    return columnDefinition.toString();
+  }
+
+  private String deleteColumnFieldDefinition(
+      TableChange.DeleteColumn deleteColumn, JdbcTable jdbcTable) {
+    if (deleteColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = deleteColumn.fieldName()[0];
+    boolean colExists = true;
+    try {
+      getJdbcColumnFromTable(jdbcTable, col);
+    } catch (NoSuchColumnException noSuchColumnException) {
+      colExists = false;
+    }
+    if (!colExists) {
+      if (BooleanUtils.isTrue(deleteColumn.getIfExists())) {
+        return "";
+      } else {
+        throw new IllegalArgumentException("Delete column does not exist: " + 
col);
+      }
+    }
+    return "DROP COLUMN " + BACK_QUOTE + col + BACK_QUOTE;
+  }
+
+  private String updateColumnDefaultValueFieldDefinition(
+      TableChange.UpdateColumnDefaultValue updateColumnDefaultValue, JdbcTable 
jdbcTable) {
+    if (updateColumnDefaultValue.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = updateColumnDefaultValue.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    StringBuilder sqlBuilder = new StringBuilder(MODIFY_COLUMN + col);
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withType(column.dataType())
+            .withNullable(column.nullable())
+            .withComment(column.comment())
+            .withDefaultValue(updateColumnDefaultValue.getNewDefaultValue())
+            .build();
+    return appendColumnDefinition(newColumn, sqlBuilder).toString();
+  }
+
+  private String updateColumnTypeFieldDefinition(
+      TableChange.UpdateColumnType updateColumnType, JdbcTable jdbcTable) {
+    if (updateColumnType.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(OCEANBASE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = updateColumnType.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    StringBuilder sqlBuilder = new StringBuilder(MODIFY_COLUMN + col);
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withType(updateColumnType.getNewDataType())
+            .withComment(column.comment())
+            .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+            .withNullable(column.nullable())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+    return appendColumnDefinition(newColumn, sqlBuilder).toString();
+  }
+
+  private StringBuilder appendColumnDefinition(JdbcColumn column, 
StringBuilder sqlBuilder) {

Review Comment:
   Same with MySQL also



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to