yuqi1129 commented on code in PR #9963:
URL: https://github.com/apache/gravitino/pull/9963#discussion_r2803917210


##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hologres.operation;
+
+import static 
org.apache.gravitino.catalog.hologres.operation.HologresTableOperations.HOLO_QUOTE;
+
+import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.sql.DataSource;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.catalog.jdbc.JdbcSchema;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
+import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.meta.AuditInfo;
+
+/**
+ * Schema (Database) operations for Hologres.
+ *
+ * <p>Hologres uses the PostgreSQL schema concept where Database in PostgreSQL 
corresponds to
+ * Catalog in JDBC, and Schema in PostgreSQL corresponds to Schema in JDBC.
+ */
+public class HologresSchemaOperations extends JdbcDatabaseOperations {
+
+  private String database;
+
+  @Override
+  public void initialize(
+      DataSource dataSource, JdbcExceptionConverter exceptionMapper, 
Map<String, String> conf) {
+    super.initialize(dataSource, exceptionMapper, conf);
+    database = new JdbcConfig(conf).getJdbcDatabase();
+  }
+
+  @Override
+  public JdbcSchema load(String schema) throws NoSuchSchemaException {
+    try (Connection connection = getConnection()) {
+      if (!schemaExists(connection, schema)) {
+        throw new NoSuchSchemaException("No such schema: %s", schema);
+      }
+
+      String comment = getSchemaComment(schema, connection);
+      return JdbcSchema.builder()
+          .withName(schema)
+          .withComment(comment)
+          .withAuditInfo(AuditInfo.EMPTY)
+          .withProperties(Collections.emptyMap())
+          .build();
+    } catch (SQLException e) {
+      throw exceptionMapper.toGravitinoException(e);
+    }
+  }
+
+  @Override
+  public List<String> listDatabases() {
+    List<String> result = new ArrayList<>();
+    try (Connection connection = getConnection();
+        ResultSet resultSet = getSchema(connection, null)) {
+      while (resultSet.next()) {
+        String schemaName = resultSet.getString(1);
+        if (!isSystemDatabase(schemaName)) {
+          result.add(resultSet.getString(1));
+        }
+      }
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+    return result;
+  }
+
+  @Override
+  public String generateCreateDatabaseSql(
+      String schema, String comment, Map<String, String> properties) {
+    if (MapUtils.isNotEmpty(properties)) {
+      throw new UnsupportedOperationException(
+          "Hologres does not support properties on schema create.");
+    }
+
+    StringBuilder sqlBuilder = new StringBuilder("CREATE SCHEMA \"" + schema + 
"\";");
+    if (StringUtils.isNotEmpty(comment)) {
+      sqlBuilder

Review Comment:
   "COMMENT ON SCHEMA %s%s%s IS '%s'".format(....) seems to be more readable. 
the same as below and all. 



##########
server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java:
##########
@@ -356,11 +356,23 @@ private static void preloadToCache(
       return;
     }
 
-    GravitinoEnv.getInstance()
-        .entityStore()
-        .batchGet(
-            Arrays.asList(nameIdentifiers),
-            entityType,
-            EntityClassMapper.getEntityClass(entityType));
+    try {
+      GravitinoEnv.getInstance()

Review Comment:
   @roryqi Please help to verify this change.



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -0,0 +1,1089 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hologres.operation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.catalog.jdbc.operation.DatabaseOperation;
+import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
+import org.apache.gravitino.catalog.jdbc.operation.RequireDatabaseOperation;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+
+/**
+ * Table operations for Hologres.
+ *
+ * <p>Hologres is PostgreSQL-compatible, so most table operations follow 
PostgreSQL conventions.
+ * However, Hologres has specific features like table properties (orientation, 
distribution_key,
+ * etc.) that are handled through the WITH clause in CREATE TABLE statements.
+ */
+public class HologresTableOperations extends JdbcTableOperations
+    implements RequireDatabaseOperation {
+
+  public static final String HOLO_QUOTE = "\"";
+  public static final String NEW_LINE = "\n";
+  public static final String ALTER_TABLE = "ALTER TABLE ";
+  public static final String ALTER_COLUMN = "ALTER COLUMN ";
+  public static final String IS = " IS '";
+  public static final String COLUMN_COMMENT = "COMMENT ON COLUMN ";
+  public static final String TABLE_COMMENT = "COMMENT ON TABLE ";
+
+  private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG =
+      "Hologres does not support nested column names.";
+
+  private String database;
+  private HologresSchemaOperations schemaOperations;
+
+  @Override
+  public void initialize(
+      DataSource dataSource,
+      JdbcExceptionConverter exceptionMapper,
+      JdbcTypeConverter jdbcTypeConverter,
+      JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter,
+      Map<String, String> conf) {
+    super.initialize(
+        dataSource, exceptionMapper, jdbcTypeConverter, 
jdbcColumnDefaultValueConverter, conf);
+    database = new JdbcConfig(conf).getJdbcDatabase();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(database),
+        "The `jdbc-database` configuration item is mandatory in Hologres.");
+  }
+
+  @Override
+  public void setDatabaseOperation(DatabaseOperation databaseOperation) {
+    this.schemaOperations = (HologresSchemaOperations) databaseOperation;
+  }
+
+  @Override
+  public List<String> listTables(String schemaName) throws 
NoSuchSchemaException {
+    try (Connection connection = getConnection(schemaName)) {
+      if (!schemaOperations.schemaExists(connection, schemaName)) {
+        throw new NoSuchSchemaException("No such schema: %s", schemaName);
+      }
+      final List<String> names = Lists.newArrayList();
+      try (ResultSet tables = getTables(connection)) {
+        while (tables.next()) {
+          if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) {
+            names.add(tables.getString("TABLE_NAME"));
+          }
+        }
+      }
+      LOG.info("Finished listing tables size {} for schema name {} ", 
names.size(), schemaName);
+      return names;
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  protected JdbcTable.Builder getTableBuilder(
+      ResultSet tablesResult, String databaseName, String tableName) throws 
SQLException {
+    boolean found = false;
+    JdbcTable.Builder builder = null;
+    while (tablesResult.next() && !found) {
+      String tableNameInResult = tablesResult.getString("TABLE_NAME");
+      String tableSchemaInResultLowerCase = 
tablesResult.getString("TABLE_SCHEM");
+      if (Objects.equals(tableNameInResult, tableName)
+          && Objects.equals(tableSchemaInResultLowerCase, databaseName)) {
+        builder = getBasicJdbcTableInfo(tablesResult);
+        found = true;
+      }
+    }
+
+    if (!found) {
+      throw new NoSuchTableException("Table %s does not exist in %s.", 
tableName, databaseName);
+    }
+
+    return builder;
+  }
+
+  @Override
+  protected JdbcColumn.Builder getColumnBuilder(
+      ResultSet columnsResult, String databaseName, String tableName) throws 
SQLException {
+    JdbcColumn.Builder builder = null;
+    if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName)
+        && Objects.equals(columnsResult.getString("TABLE_SCHEM"), 
databaseName)) {
+      builder = getBasicJdbcColumnInfo(columnsResult);
+    }
+    return builder;
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes) {
+    boolean isLogicalPartition =
+        MapUtils.isNotEmpty(properties)
+            && 
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder
+        .append("CREATE TABLE ")
+        .append(HOLO_QUOTE)
+        .append(tableName)
+        .append(HOLO_QUOTE)
+        .append(" (")
+        .append(NEW_LINE);
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append("    
").append(HOLO_QUOTE).append(column.name()).append(HOLO_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",").append(NEW_LINE);
+      }
+    }
+    appendIndexesSql(indexes, sqlBuilder);
+    sqlBuilder.append(NEW_LINE).append(")");
+
+    // Append partitioning clause if specified
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+    }
+
+    // Build WITH clause combining distribution and Hologres-specific table 
properties
+    // Supported properties: orientation, distribution_key, clustering_key, 
event_time_column,
+    // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, 
table_group, etc.
+    List<String> withEntries = new ArrayList<>();
+
+    // Add distribution_key from Distribution parameter
+    if (!Distributions.NONE.equals(distribution)) {
+      validateDistribution(distribution);
+      String distributionColumns =
+          Arrays.stream(distribution.expressions())
+              .map(Object::toString)
+              .collect(Collectors.joining(","));
+      withEntries.add("distribution_key = '" + distributionColumns + "'");
+    }
+
+    // Add user-specified properties (filter out read-only / 
internally-handled properties)
+    if (MapUtils.isNotEmpty(properties)) {
+      properties.forEach(
+          (key, value) -> {
+            if (!"distribution_key".equals(key)
+                && !"is_logical_partitioned_table".equals(key)
+                && !"primary_key".equals(key)) {
+              withEntries.add(key + " = '" + value + "'");
+            }
+          });
+    }
+
+    // Generate WITH clause
+    if (!withEntries.isEmpty()) {
+      sqlBuilder.append(NEW_LINE).append("WITH (").append(NEW_LINE);
+      sqlBuilder.append(
+          withEntries.stream()
+              .map(entry -> "    " + entry)
+              .collect(Collectors.joining("," + NEW_LINE)));
+      sqlBuilder.append(NEW_LINE).append(")");
+    }
+
+    sqlBuilder.append(";");
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      sqlBuilder
+          .append(NEW_LINE)
+          .append(TABLE_COMMENT)
+          .append(HOLO_QUOTE)
+          .append(tableName)
+          .append(HOLO_QUOTE)
+          .append(IS)
+          .append(comment.replace("'", "''"))
+          .append("';");
+    }
+    Arrays.stream(columns)
+        .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment()))
+        .forEach(
+            jdbcColumn ->
+                sqlBuilder
+                    .append(NEW_LINE)
+                    .append(COLUMN_COMMENT)
+                    .append(HOLO_QUOTE)
+                    .append(tableName)
+                    .append(HOLO_QUOTE)
+                    .append(".")
+                    .append(HOLO_QUOTE)
+                    .append(jdbcColumn.name())
+                    .append(HOLO_QUOTE)
+                    .append(IS)
+                    .append(jdbcColumn.comment().replace("'", "''"))
+                    .append("';"));
+
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  @VisibleForTesting
+  static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(",").append(NEW_LINE);
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          if (StringUtils.isNotEmpty(index.name())) {
+            sqlBuilder
+                .append("CONSTRAINT ")
+                .append(HOLO_QUOTE)
+                .append(index.name())
+                .append(HOLO_QUOTE);
+          }
+          sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
+          break;
+        case UNIQUE_KEY:
+          if (StringUtils.isNotEmpty(index.name())) {
+            sqlBuilder
+                .append("CONSTRAINT ")
+                .append(HOLO_QUOTE)
+                .append(index.name())
+                .append(HOLO_QUOTE);
+          }
+          sqlBuilder.append(" UNIQUE (").append(fieldStr).append(")");
+          break;
+        default:
+          throw new IllegalArgumentException("Hologres doesn't support index : 
" + index.type());
+      }
+    }
+  }
+
+  protected static String getIndexFieldStr(String[][] fieldNames) {
+    return Arrays.stream(fieldNames)
+        .map(
+            colNames -> {
+              if (colNames.length > 1) {
+                throw new IllegalArgumentException(
+                    "Index does not support complex fields in Hologres");
+              }
+              return HOLO_QUOTE + colNames[0] + HOLO_QUOTE;
+            })
+        .collect(Collectors.joining(", "));
+  }
+
+  /**
+   * Append the partitioning clause to the CREATE TABLE SQL.
+   *
+   * <p>Hologres supports two types of partition tables:
+   *
+   * <ul>
+   *   <li>Physical partition table: uses {@code PARTITION BY LIST(column)} 
syntax
+   *   <li>Logical partition table (V3.1+): uses {@code LOGICAL PARTITION BY 
LIST(column1[,
+   *       column2])} syntax
+   * </ul>
+   *
+   * @param partitioning the partition transforms (only LIST partitioning is 
supported)
+   * @param isLogicalPartition whether to create a logical partition table
+   * @param sqlBuilder the SQL builder to append to
+   */
+  @VisibleForTesting
+  static void appendPartitioningSql(
+      Transform[] partitioning, boolean isLogicalPartition, StringBuilder 
sqlBuilder) {
+    Preconditions.checkArgument(
+        partitioning.length == 1 && partitioning[0] instanceof 
Transforms.ListTransform,
+        "Hologres only supports LIST partitioning");
+
+    Transforms.ListTransform listTransform = (Transforms.ListTransform) 
partitioning[0];
+    String[][] fieldNames = listTransform.fieldNames();
+
+    Preconditions.checkArgument(fieldNames.length > 0, "Partition columns must 
not be empty");
+
+    if (isLogicalPartition) {
+      Preconditions.checkArgument(
+          fieldNames.length <= 2,
+          "Logical partition table supports at most 2 partition columns, but 
got: %s",
+          fieldNames.length);
+    } else {
+      Preconditions.checkArgument(
+          fieldNames.length == 1,
+          "Physical partition table supports exactly 1 partition column, but 
got: %s",
+          fieldNames.length);
+    }
+
+    String partitionColumns =
+        Arrays.stream(fieldNames)
+            .map(
+                colNames -> {
+                  Preconditions.checkArgument(
+                      colNames.length == 1,
+                      "Hologres partition does not support nested field 
names");
+                  return HOLO_QUOTE + colNames[0] + HOLO_QUOTE;
+                })
+            .collect(Collectors.joining(", "));
+
+    sqlBuilder.append(NEW_LINE);
+    if (isLogicalPartition) {
+      sqlBuilder.append("LOGICAL PARTITION BY 
LIST(").append(partitionColumns).append(")");
+    } else {
+      sqlBuilder.append("PARTITION BY 
LIST(").append(partitionColumns).append(")");
+    }
+  }
+
+  private void appendColumnDefinition(JdbcColumn column, StringBuilder 
sqlBuilder) {
+    // Add data type
+    
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+    // Hologres does not support auto-increment columns via Gravitino
+    if (column.autoIncrement()) {
+      throw new IllegalArgumentException(
+          "Hologres does not support creating auto-increment columns via 
Gravitino, column: "
+              + column.name());
+    }
+
+    // Handle generated (stored computed) columns:
+    // GENERATED ALWAYS AS (expr) STORED must come before nullable constraints.
+    if (column.defaultValue() instanceof UnparsedExpression) {
+      String expr = ((UnparsedExpression) 
column.defaultValue()).unparsedExpression();
+      sqlBuilder.append("GENERATED ALWAYS AS (").append(expr).append(") STORED 
");
+      if (column.nullable()) {
+        sqlBuilder.append("NULL ");
+      } else {
+        sqlBuilder.append("NOT NULL ");
+      }
+      return;
+    }
+
+    // Add NOT NULL if the column is marked as such
+    if (column.nullable()) {
+      sqlBuilder.append("NULL ");
+    } else {
+      sqlBuilder.append("NOT NULL ");
+    }
+    // Add DEFAULT value if specified
+    appendDefaultValue(column, sqlBuilder);
+  }
+
+  @Override
+  protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
+    return ALTER_TABLE
+        + HOLO_QUOTE
+        + oldTableName
+        + HOLO_QUOTE
+        + " RENAME TO "
+        + HOLO_QUOTE
+        + newTableName
+        + HOLO_QUOTE;
+  }
+
+  @Override
+  protected String generateDropTableSql(String tableName) {
+    return "DROP TABLE " + HOLO_QUOTE + tableName + HOLO_QUOTE;
+  }
+
+  @Override
+  protected String generatePurgeTableSql(String tableName) {
+    throw new UnsupportedOperationException(
+        "Hologres does not support purge table in Gravitino, please use drop 
table");
+  }
+
+  @Override
+  protected String generateAlterTableSql(
+      String schemaName, String tableName, TableChange... changes) {
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    List<String> alterSql = new ArrayList<>();
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+        alterSql.add(updateCommentDefinition((TableChange.UpdateComment) 
change, lazyLoadTable));
+      } else if (change instanceof TableChange.SetProperty) {
+        throw new IllegalArgumentException("Set property is not supported 
yet");
+      } else if (change instanceof TableChange.RemoveProperty) {
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+      } else if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+        alterSql.addAll(addColumnFieldDefinition(addColumn, lazyLoadTable));
+      } else if (change instanceof TableChange.RenameColumn) {
+        TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) 
change;
+        alterSql.add(renameColumnFieldDefinition(renameColumn, tableName));
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column default value via ALTER 
TABLE.");
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column type via ALTER TABLE.");
+      } else if (change instanceof TableChange.UpdateColumnComment) {
+        alterSql.add(
+            updateColumnCommentFieldDefinition(
+                (TableChange.UpdateColumnComment) change, tableName));
+      } else if (change instanceof TableChange.UpdateColumnPosition) {
+        throw new IllegalArgumentException("Hologres does not support column 
position.");
+      } else if (change instanceof TableChange.DeleteColumn) {
+        lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+        TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
change;
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column nullability via ALTER 
TABLE.");
+      } else if (change instanceof TableChange.AddIndex) {
+        throw new IllegalArgumentException(
+            "Hologres does not support adding index via ALTER TABLE.");
+      } else if (change instanceof TableChange.DeleteIndex) {
+        throw new IllegalArgumentException(
+            "Hologres does not support deleting index via ALTER TABLE.");
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column auto-increment via 
ALTER TABLE.");
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    // If there is no change, return directly
+    if (alterSql.isEmpty()) {

Review Comment:
   Since methods like `deleteColumnFieldDefinition` will return an empty 
string(""). `alterSql` can be a list with all elements that are empty strings, 
we need to consider this case. 



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -0,0 +1,1089 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hologres.operation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.catalog.jdbc.operation.DatabaseOperation;
+import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
+import org.apache.gravitino.catalog.jdbc.operation.RequireDatabaseOperation;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+
+/**
+ * Table operations for Hologres.
+ *
+ * <p>Hologres is PostgreSQL-compatible, so most table operations follow 
PostgreSQL conventions.
+ * However, Hologres has specific features like table properties (orientation, 
distribution_key,
+ * etc.) that are handled through the WITH clause in CREATE TABLE statements.
+ */
+public class HologresTableOperations extends JdbcTableOperations
+    implements RequireDatabaseOperation {
+
+  public static final String HOLO_QUOTE = "\"";
+  public static final String NEW_LINE = "\n";
+  public static final String ALTER_TABLE = "ALTER TABLE ";
+  public static final String ALTER_COLUMN = "ALTER COLUMN ";
+  public static final String IS = " IS '";
+  public static final String COLUMN_COMMENT = "COMMENT ON COLUMN ";
+  public static final String TABLE_COMMENT = "COMMENT ON TABLE ";
+
+  private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG =
+      "Hologres does not support nested column names.";
+
+  private String database;
+  private HologresSchemaOperations schemaOperations;
+
+  @Override
+  public void initialize(
+      DataSource dataSource,
+      JdbcExceptionConverter exceptionMapper,
+      JdbcTypeConverter jdbcTypeConverter,
+      JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter,
+      Map<String, String> conf) {
+    super.initialize(
+        dataSource, exceptionMapper, jdbcTypeConverter, 
jdbcColumnDefaultValueConverter, conf);
+    database = new JdbcConfig(conf).getJdbcDatabase();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(database),
+        "The `jdbc-database` configuration item is mandatory in Hologres.");
+  }
+
+  @Override
+  public void setDatabaseOperation(DatabaseOperation databaseOperation) {
+    this.schemaOperations = (HologresSchemaOperations) databaseOperation;
+  }
+
+  @Override
+  public List<String> listTables(String schemaName) throws 
NoSuchSchemaException {
+    try (Connection connection = getConnection(schemaName)) {
+      if (!schemaOperations.schemaExists(connection, schemaName)) {
+        throw new NoSuchSchemaException("No such schema: %s", schemaName);
+      }
+      final List<String> names = Lists.newArrayList();
+      try (ResultSet tables = getTables(connection)) {
+        while (tables.next()) {
+          if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) {
+            names.add(tables.getString("TABLE_NAME"));
+          }
+        }
+      }
+      LOG.info("Finished listing tables size {} for schema name {} ", 
names.size(), schemaName);
+      return names;
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  protected JdbcTable.Builder getTableBuilder(
+      ResultSet tablesResult, String databaseName, String tableName) throws 
SQLException {
+    boolean found = false;
+    JdbcTable.Builder builder = null;
+    while (tablesResult.next() && !found) {
+      String tableNameInResult = tablesResult.getString("TABLE_NAME");
+      String tableSchemaInResultLowerCase = 
tablesResult.getString("TABLE_SCHEM");
+      if (Objects.equals(tableNameInResult, tableName)
+          && Objects.equals(tableSchemaInResultLowerCase, databaseName)) {
+        builder = getBasicJdbcTableInfo(tablesResult);
+        found = true;
+      }
+    }
+
+    if (!found) {
+      throw new NoSuchTableException("Table %s does not exist in %s.", 
tableName, databaseName);
+    }
+
+    return builder;
+  }
+
+  @Override
+  protected JdbcColumn.Builder getColumnBuilder(
+      ResultSet columnsResult, String databaseName, String tableName) throws 
SQLException {
+    JdbcColumn.Builder builder = null;
+    if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName)
+        && Objects.equals(columnsResult.getString("TABLE_SCHEM"), 
databaseName)) {
+      builder = getBasicJdbcColumnInfo(columnsResult);
+    }
+    return builder;
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes) {
+    boolean isLogicalPartition =
+        MapUtils.isNotEmpty(properties)
+            && 
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder
+        .append("CREATE TABLE ")
+        .append(HOLO_QUOTE)
+        .append(tableName)
+        .append(HOLO_QUOTE)
+        .append(" (")
+        .append(NEW_LINE);
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append("    
").append(HOLO_QUOTE).append(column.name()).append(HOLO_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",").append(NEW_LINE);
+      }
+    }
+    appendIndexesSql(indexes, sqlBuilder);
+    sqlBuilder.append(NEW_LINE).append(")");
+
+    // Append partitioning clause if specified
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+    }
+
+    // Build WITH clause combining distribution and Hologres-specific table 
properties
+    // Supported properties: orientation, distribution_key, clustering_key, 
event_time_column,
+    // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, 
table_group, etc.
+    List<String> withEntries = new ArrayList<>();
+
+    // Add distribution_key from Distribution parameter
+    if (!Distributions.NONE.equals(distribution)) {
+      validateDistribution(distribution);
+      String distributionColumns =
+          Arrays.stream(distribution.expressions())
+              .map(Object::toString)
+              .collect(Collectors.joining(","));
+      withEntries.add("distribution_key = '" + distributionColumns + "'");
+    }
+
+    // Add user-specified properties (filter out read-only / 
internally-handled properties)
+    if (MapUtils.isNotEmpty(properties)) {
+      properties.forEach(
+          (key, value) -> {
+            if (!"distribution_key".equals(key)
+                && !"is_logical_partitioned_table".equals(key)
+                && !"primary_key".equals(key)) {
+              withEntries.add(key + " = '" + value + "'");

Review Comment:
   You'd better use a set to keep the properties that need to be exclude.



##########
common/src/main/java/org/apache/gravitino/json/JsonUtils.java:
##########
@@ -1319,7 +1319,9 @@ public DistributionDTO deserialize(JsonParser p, 
DeserializationContext ctxt)
         String strategy = getString(STRATEGY, node);
         builder.withStrategy(Strategy.getByName(strategy));
       }
-      builder.withNumber(getInt(NUMBER, node));
+      if (node.has(NUMBER)) {

Review Comment:
   Is this a bug?



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/converter/HologresTypeConverter.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hologres.converter;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Optional;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.rel.types.Types.ListType;
+
+/**
+ * Type converter for Hologres.
+ *
+ * <p>Hologres is PostgreSQL-compatible, so type conversion follows PostgreSQL 
conventions. Hologres

Review Comment:
   So, is the code almost the same as that in the PostgreSQL catalog?  Can we 
try to reuse the code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to