TimothyDing commented on code in PR #10068:
URL: https://github.com/apache/gravitino/pull/10068#discussion_r2890392297
##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -56,20 +193,875 @@ protected String generateCreateTableSql(
Transform[] partitioning,
Distribution distribution,
Index[] indexes) {
- throw new UnsupportedOperationException(
- "Hologres table creation will be implemented in a follow-up PR.");
+ boolean isLogicalPartition =
+ MapUtils.isNotEmpty(properties)
+ &&
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+ StringBuilder sqlBuilder = new StringBuilder();
+ sqlBuilder.append(
+ String.format("CREATE TABLE %s%s%s (%s", HOLO_QUOTE, tableName,
HOLO_QUOTE, NEW_LINE));
+
+ // Add columns
+ for (int i = 0; i < columns.length; i++) {
+ JdbcColumn column = columns[i];
+ sqlBuilder.append(String.format(" %s%s%s", HOLO_QUOTE, column.name(),
HOLO_QUOTE));
+
+ appendColumnDefinition(column, sqlBuilder);
+ // Add a comma for the next column, unless it's the last one
+ if (i < columns.length - 1) {
+ sqlBuilder.append(String.format(",%s", NEW_LINE));
+ }
+ }
+ appendIndexesSql(indexes, sqlBuilder);
+ sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+ // Append partitioning clause if specified
+ if (ArrayUtils.isNotEmpty(partitioning)) {
+ appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+ }
+
+ // Build WITH clause combining distribution and Hologres-specific table
properties
+ // Supported properties: orientation, distribution_key, clustering_key,
event_time_column,
+ // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds,
table_group, etc.
+ List<String> withEntries = new ArrayList<>();
+
+ // Add distribution_key from Distribution parameter
+ if (!Distributions.NONE.equals(distribution)) {
+ validateDistribution(distribution);
+ String distributionColumns =
+ Arrays.stream(distribution.expressions())
+ .map(Object::toString)
+ .collect(Collectors.joining(","));
+ withEntries.add(String.format("distribution_key = '%s'",
distributionColumns));
+ }
+
+ // Add user-specified properties (filter out read-only /
internally-handled properties)
+ if (MapUtils.isNotEmpty(properties)) {
+ properties.forEach(
+ (key, value) -> {
+ if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) {
+ withEntries.add(String.format("%s = '%s'", key, value));
+ }
+ });
+ }
+
+ // Generate WITH clause
+ if (!withEntries.isEmpty()) {
+ sqlBuilder.append(String.format("%sWITH (%s", NEW_LINE, NEW_LINE));
+ sqlBuilder.append(
+ withEntries.stream()
+ .map(entry -> String.format(" %s", entry))
+ .collect(Collectors.joining(String.format(",%s", NEW_LINE))));
+ sqlBuilder.append(String.format("%s)", NEW_LINE));
+ }
+
+ sqlBuilder.append(";");
+
+ // Add table comment if specified
+ if (StringUtils.isNotEmpty(comment)) {
+ String escapedComment = comment.replace("'", "''");
+ sqlBuilder
+ .append(NEW_LINE)
+ .append(
+ String.format(
+ "COMMENT ON TABLE %s%s%s IS '%s';",
+ HOLO_QUOTE, tableName, HOLO_QUOTE, escapedComment));
+ }
+ Arrays.stream(columns)
+ .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment()))
+ .forEach(
+ jdbcColumn -> {
+ String escapedColComment = jdbcColumn.comment().replace("'",
"''");
+ sqlBuilder
+ .append(NEW_LINE)
+ .append(
+ String.format(
+ "COMMENT ON COLUMN %s%s%s.%s%s%s IS '%s';",
+ HOLO_QUOTE,
+ tableName,
+ HOLO_QUOTE,
+ HOLO_QUOTE,
+ jdbcColumn.name(),
+ HOLO_QUOTE,
+ escapedColComment));
+ });
+ // Return the generated SQL statement
+ String result = sqlBuilder.toString();
+
+ LOG.info("Generated create table:{} sql: {}", tableName, result);
+ return result;
+ }
+
+ @VisibleForTesting
+ static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+ for (Index index : indexes) {
+ String fieldStr = getIndexFieldStr(index.fieldNames());
+ sqlBuilder.append(String.format(",%s", NEW_LINE));
+ switch (index.type()) {
+ case PRIMARY_KEY:
+ sqlBuilder.append(String.format("PRIMARY KEY (%s)", fieldStr));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Hologres only supports PRIMARY_KEY index, but got: " +
index.type());
+ }
+ }
+ }
+
+ protected static String getIndexFieldStr(String[][] fieldNames) {
+ return Arrays.stream(fieldNames)
+ .map(
+ colNames -> {
+ if (colNames.length > 1) {
+ throw new IllegalArgumentException(
+ "Index does not support complex fields in Hologres");
+ }
+ return String.format("%s%s%s", HOLO_QUOTE, colNames[0],
HOLO_QUOTE);
+ })
+ .collect(Collectors.joining(", "));
+ }
+
+ /**
+ * Append the partitioning clause to the CREATE TABLE SQL.
+ *
+ * <p>Hologres supports two types of partition tables:
+ *
+ * <ul>
+ * <li>Physical partition table: uses {@code PARTITION BY LIST(column)}
syntax
+ * <li>Logical partition table (V3.1+): uses {@code LOGICAL PARTITION BY
LIST(column1[,
+ * column2])} syntax
+ * </ul>
+ *
+ * @param partitioning the partition transforms (only LIST partitioning is
supported)
+ * @param isLogicalPartition whether to create a logical partition table
+ * @param sqlBuilder the SQL builder to append to
+ */
+ @VisibleForTesting
+ static void appendPartitioningSql(
+ Transform[] partitioning, boolean isLogicalPartition, StringBuilder
sqlBuilder) {
+ Preconditions.checkArgument(
+ partitioning.length == 1,
+ "Hologres only supports single partition transform, but got %s",
+ partitioning.length);
+ Preconditions.checkArgument(
+ partitioning[0] instanceof Transforms.ListTransform,
+ "Hologres only supports LIST partitioning, but got %s",
+ partitioning[0].getClass().getSimpleName());
+
+ Transforms.ListTransform listTransform = (Transforms.ListTransform)
partitioning[0];
+ String[][] fieldNames = listTransform.fieldNames();
+
+ Preconditions.checkArgument(fieldNames.length > 0, "Partition columns must
not be empty");
+
+ if (isLogicalPartition) {
+ Preconditions.checkArgument(
+ fieldNames.length <= 2,
+ "Logical partition table supports at most 2 partition columns, but
got: %s",
+ fieldNames.length);
+ } else {
+ Preconditions.checkArgument(
+ fieldNames.length == 1,
+ "Physical partition table supports exactly 1 partition column, but
got: %s",
+ fieldNames.length);
+ }
+
+ String partitionColumns =
+ Arrays.stream(fieldNames)
+ .map(
+ colNames -> {
+ Preconditions.checkArgument(
+ colNames.length == 1,
+ "Hologres partition does not support nested field
names");
+ return String.format("%s%s%s", HOLO_QUOTE, colNames[0],
HOLO_QUOTE);
+ })
+ .collect(Collectors.joining(", "));
+
+ sqlBuilder.append(NEW_LINE);
+ if (isLogicalPartition) {
+ sqlBuilder.append(String.format("LOGICAL PARTITION BY LIST(%s)",
partitionColumns));
+ } else {
+ sqlBuilder.append(String.format("PARTITION BY LIST(%s)",
partitionColumns));
+ }
+ }
+
+ private void appendColumnDefinition(JdbcColumn column, StringBuilder
sqlBuilder) {
+ // Add data type
+
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+ // Hologres does not support auto-increment columns via Gravitino
+ if (column.autoIncrement()) {
+ throw new IllegalArgumentException(
+ "Hologres does not support creating auto-increment columns via
Gravitino, column: "
+ + column.name());
+ }
+
+ // Handle generated (stored computed) columns:
+ // GENERATED ALWAYS AS (expr) STORED must come before nullable constraints.
+ if (column.defaultValue() instanceof UnparsedExpression) {
+ String expr = ((UnparsedExpression)
column.defaultValue()).unparsedExpression();
+ sqlBuilder.append(String.format("GENERATED ALWAYS AS (%s) STORED ",
expr));
+ if (column.nullable()) {
+ sqlBuilder.append("NULL ");
+ } else {
+ sqlBuilder.append("NOT NULL ");
+ }
+ return;
+ }
+
+ // Add NOT NULL if the column is marked as such
+ if (column.nullable()) {
+ sqlBuilder.append("NULL ");
+ } else {
+ sqlBuilder.append("NOT NULL ");
+ }
+ // Add DEFAULT value if specified
+ appendDefaultValue(column, sqlBuilder);
}
@Override
- protected String generateAlterTableSql(
- String schemaName, String tableName, TableChange... changes) {
- throw new UnsupportedOperationException(
- "Hologres table alteration will be implemented in a follow-up PR.");
+ protected String generateRenameTableSql(String oldTableName, String
newTableName) {
+ return String.format(
+ "%s%s%s%s RENAME TO %s%s%s",
+ ALTER_TABLE, HOLO_QUOTE, oldTableName, HOLO_QUOTE, HOLO_QUOTE,
newTableName, HOLO_QUOTE);
+ }
+
+ @Override
+ protected String generateDropTableSql(String tableName) {
+ return String.format("DROP TABLE %s%s%s", HOLO_QUOTE, tableName,
HOLO_QUOTE);
}
@Override
protected String generatePurgeTableSql(String tableName) {
throw new UnsupportedOperationException(
"Hologres does not support purge table in Gravitino, please use drop
table");
}
+
+ @Override
+ protected String generateAlterTableSql(
+ String schemaName, String tableName, TableChange... changes) {
+ // Not all operations require the original table information, so lazy
loading is used here
+ JdbcTable lazyLoadTable = null;
+ List<String> alterSql = new ArrayList<>();
+ for (TableChange change : changes) {
+ if (change instanceof TableChange.UpdateComment) {
+ lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+ alterSql.add(updateCommentDefinition((TableChange.UpdateComment)
change, lazyLoadTable));
+ } else if (change instanceof TableChange.SetProperty) {
+ throw new IllegalArgumentException("Set property is not supported
yet");
+ } else if (change instanceof TableChange.RemoveProperty) {
+ throw new IllegalArgumentException("Remove property is not supported
yet");
+ } else if (change instanceof TableChange.AddColumn) {
Review Comment:
fix in a5b7ea03ae306fb94427dfdaeca1f98b5bfae46d
##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -56,20 +193,875 @@ protected String generateCreateTableSql(
Transform[] partitioning,
Distribution distribution,
Index[] indexes) {
- throw new UnsupportedOperationException(
- "Hologres table creation will be implemented in a follow-up PR.");
+ boolean isLogicalPartition =
+ MapUtils.isNotEmpty(properties)
+ &&
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+ StringBuilder sqlBuilder = new StringBuilder();
+ sqlBuilder.append(
+ String.format("CREATE TABLE %s%s%s (%s", HOLO_QUOTE, tableName,
HOLO_QUOTE, NEW_LINE));
+
+ // Add columns
+ for (int i = 0; i < columns.length; i++) {
+ JdbcColumn column = columns[i];
+ sqlBuilder.append(String.format(" %s%s%s", HOLO_QUOTE, column.name(),
HOLO_QUOTE));
+
+ appendColumnDefinition(column, sqlBuilder);
+ // Add a comma for the next column, unless it's the last one
+ if (i < columns.length - 1) {
+ sqlBuilder.append(String.format(",%s", NEW_LINE));
+ }
+ }
+ appendIndexesSql(indexes, sqlBuilder);
+ sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+ // Append partitioning clause if specified
+ if (ArrayUtils.isNotEmpty(partitioning)) {
+ appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+ }
+
+ // Build WITH clause combining distribution and Hologres-specific table
properties
+ // Supported properties: orientation, distribution_key, clustering_key,
event_time_column,
+ // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds,
table_group, etc.
+ List<String> withEntries = new ArrayList<>();
+
+ // Add distribution_key from Distribution parameter
+ if (!Distributions.NONE.equals(distribution)) {
+ validateDistribution(distribution);
+ String distributionColumns =
+ Arrays.stream(distribution.expressions())
+ .map(Object::toString)
Review Comment:
fix in a5b7ea03ae306fb94427dfdaeca1f98b5bfae46d
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]