yuqi1129 commented on code in PR #5078:
URL: https://github.com/apache/gravitino/pull/5078#discussion_r1793590416


##########
scripts/h2/schema-0.7.0-h2.sql:
##########
@@ -0,0 +1,290 @@
+--

Review Comment:
   Once we are going to release a new version then a corresponding schema SQL 
file is needed, am I right?



##########
scripts/postgresql/schema-0.7.0-postgresql.sql:
##########
@@ -140,6 +140,48 @@ COMMENT ON COLUMN table_meta.current_version IS 'table 
current version';
 COMMENT ON COLUMN table_meta.last_version IS 'table last version';
 COMMENT ON COLUMN table_meta.deleted_at IS 'table deleted at';
 
+CREATE TABLE IF NOT EXISTS table_column_version_info (
+    id BIGINT NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    catalog_id BIGINT NOT NULL,
+    schema_id BIGINT NOT NULL,
+    table_id BIGINT NOT NULL,
+    table_version INT NOT NULL,
+    column_id BIGINT NOT NULL,
+    column_name VARCHAR(128) NOT NULL,
+    column_type VARCHAR(128) NOT NULL,
+    column_comment VARCHAR(256) DEFAULT '',
+    column_nullable SMALLINT NOT NULL DEFAULT 1,
+    column_auto_increment SMALLINT NOT NULL DEFAULT 0,
+    column_default_value VARCHAR(256) DEFAULT NULL,
+    column_op_type SMALLINT NOT NULL,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    audit_info TEXT NOT NULL,
+    PRIMARY KEY (id),
+    UNIQUE (table_id, table_version, column_id, deleted_at)
+);
+CREATE INDEX idx_mid ON table_column_version_info (metalake_id);
+CREATE INDEX idx_cid ON table_column_version_info (catalog_id);
+CREATE INDEX idx_sid ON table_column_version_info (schema_id);
+COMMENT ON TABLE table_column_version_info IS 'table column version 
information';
+
+COMMENT ON COLUMN table_column_version_info.id IS 'auto increment id';
+COMMENT ON COLUMN table_column_version_info.metalake_id IS 'metalake id';
+COMMENT ON COLUMN table_column_version_info.catalog_id IS 'catalog id';
+COMMENT ON COLUMN table_column_version_info.schema_id IS 'schema id';
+COMMENT ON COLUMN table_column_version_info.table_id IS 'table id';
+COMMENT ON COLUMN table_column_version_info.table_version IS 'table version';
+COMMENT ON COLUMN table_column_version_info.column_id IS 'column id';
+COMMENT ON COLUMN table_column_version_info.column_name IS 'column name';
+COMMENT ON COLUMN table_column_version_info.column_type IS 'column type';
+COMMENT ON COLUMN table_column_version_info.column_comment IS 'column comment';
+COMMENT ON COLUMN table_column_version_info.column_nullable IS 'column 
nullable, 0 is not nullable, 1 is nullable';
+COMMENT ON COLUMN table_column_version_info.column_auto_increment IS 'column 
auto increment, 0 is not auto increment, 1 is auto increment';
+COMMENT ON COLUMN table_column_version_info.column_default_value IS 'column 
default value';
+COMMENT ON COLUMN table_column_version_info.column_op_type IS 'column op type, 
1 is add, 2 is modify, 3 is delete';

Review Comment:
   You'd better use the full word 'operation'.



##########
core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableColumnBaseSQLProvider.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
+import org.apache.gravitino.storage.relational.po.ColumnPO;
+import org.apache.ibatis.annotations.Param;
+
+public class TableColumnBaseSQLProvider {
+
+  public String listColumnPOsByTableIdAndVersion(
+      @Param("tableId") Long tableId, @Param("tableVersion") Long 
tableVersion) {
+    return "SELECT t1.column_id as columnId, t1.column_name as columnName,"
+        + " t1.metalake_id as metalakeId, t1.catalog_id as catalogId,"

Review Comment:
   as -> AS



##########
core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java:
##########
@@ -424,6 +447,73 @@ public static TableEntity fromTablePO(TablePO tablePO, 
Namespace namespace) {
     }
   }
 
+  public static ColumnEntity fromColumnPO(ColumnPO columnPO) {
+    try {
+      return ColumnEntity.builder()
+          .withId(columnPO.getColumnId())
+          .withName(columnPO.getColumnName())
+          
.withDataType(JsonUtils.anyFieldMapper().readValue(columnPO.getColumnType(), 
Type.class))
+          .withComment(columnPO.getColumnComment())
+          .withAutoIncrement(
+              
ColumnPO.AutoIncrement.fromValue(columnPO.getAutoIncrement()).autoIncrement())
+          
.withNullable(ColumnPO.Nullable.fromValue(columnPO.getNullable()).nullable())
+          .withDefaultValue(
+              columnPO.getDefaultValue() == null
+                  ? Column.DEFAULT_VALUE_NOT_SET
+                  : DTOConverters.fromFunctionArg(
+                      (FunctionArg)
+                          JsonUtils.anyFieldMapper()
+                              .readValue(columnPO.getDefaultValue(), 
Expression.class)))
+          .withAuditInfo(
+              JsonUtils.anyFieldMapper().readValue(columnPO.getAuditInfo(), 
AuditInfo.class))
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to deserialize json object:", e);
+    }
+  }
+
+  public static List<ColumnEntity> fromColumnPOs(List<ColumnPO> columnPOs) {
+    return 
columnPOs.stream().map(POConverters::fromColumnPO).collect(Collectors.toList());
+  }
+
+  public static ColumnPO initializeColumnPO(
+      TablePO tablePO, ColumnEntity columnEntity, ColumnPO.ColumnOpType 
opType) {
+    try {
+      return ColumnPO.builder()
+          .withColumnId(columnEntity.id())
+          .withColumnName(columnEntity.name())
+          .withMetalakeId(tablePO.getMetalakeId())
+          .withCatalogId(tablePO.getCatalogId())
+          .withSchemaId(tablePO.getSchemaId())
+          .withTableId(tablePO.getTableId())
+          .withTableVersion(tablePO.getCurrentVersion())
+          
.withColumnType(JsonUtils.anyFieldMapper().writeValueAsString(columnEntity.dataType()))
+          .withColumnComment(columnEntity.comment())
+          
.withNullable(ColumnPO.Nullable.fromBoolean(columnEntity.nullable()).value())
+          .withAutoIncrement(
+              
ColumnPO.AutoIncrement.fromBoolean(columnEntity.autoIncrement()).value())
+          .withDefaultValue(
+              columnEntity.defaultValue() == null
+                      || 
columnEntity.defaultValue().equals(Column.DEFAULT_VALUE_NOT_SET)
+                  ? null

Review Comment:
   I wonder if the value of nullable is `false`,  can we set the default value 
as null?



##########
core/src/main/java/org/apache/gravitino/storage/relational/po/ColumnPO.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+
+@EqualsAndHashCode
+@Getter
+public class ColumnPO {
+
+  public enum ColumnOpType {
+    CREATE((byte) 1),
+    UPDATE((byte) 2),
+    DELETE((byte) 3);
+
+    private final Byte value;
+
+    ColumnOpType(Byte value) {
+      this.value = value;
+    }
+
+    public Byte value() {
+      return value;
+    }
+  }
+
+  public enum Nullable {
+    TRUE((byte) 0, true),
+    FALSE((byte) 1, false);
+
+    private final Byte value;
+
+    private final boolean nullable;
+
+    Nullable(Byte value, boolean nullable) {
+      this.value = value;
+      this.nullable = nullable;
+    }
+
+    public Byte value() {
+      return value;
+    }
+
+    public boolean nullable() {
+      return nullable;
+    }
+
+    public static Nullable fromValue(Byte value) {
+      for (Nullable nullable : values()) {
+        if (nullable.value.equals(value)) {
+          return nullable;
+        }
+      }
+      throw new IllegalArgumentException("Invalid nullable value: " + value);
+    }
+
+    public static Nullable fromBoolean(boolean nullable) {
+      for (Nullable nullableEnum : values()) {
+        if (nullableEnum.nullable == nullable) {
+          return nullableEnum;
+        }
+      }
+      throw new IllegalArgumentException("Invalid nullable boolean value: " + 
nullable);
+    }
+  }
+
+  public enum AutoIncrement {
+    TRUE((byte) 0, true),
+    FALSE((byte) 1, false);
+
+    private final Byte value;

Review Comment:
   and here.



##########
core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableColumnPostgreSQLProvider.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.TableColumnBaseSQLProvider;
+import org.apache.ibatis.annotations.Param;
+
+public class TableColumnPostgreSQLProvider extends TableColumnBaseSQLProvider {
+
+  @Override
+  public String softDeleteColumnsByTableId(@Param("tableId") Long tableId) {
+    return "UPDATE "
+        + TableColumnMapper.COLUMN_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE table_id = #{tableId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteColumnsByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + TableColumnMapper.COLUMN_TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))"

Review Comment:
   lack a `)` here. 



##########
core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java:
##########
@@ -117,16 +108,34 @@ public void insertTable(TableEntity tableEntity, boolean 
overwrite) throws IOExc
       TablePO.Builder builder = TablePO.builder();
       fillTablePOBuilderParentEntityId(builder, tableEntity.namespace());
 
-      SessionUtils.doWithCommit(
-          TableMetaMapper.class,
-          mapper -> {
-            TablePO po = 
POConverters.initializeTablePOWithVersion(tableEntity, builder);
+      AtomicReference<TablePO> tablePORef = new AtomicReference<>();
+      SessionUtils.doMultipleWithCommit(
+          () ->
+              SessionUtils.doWithoutCommit(
+                  TableMetaMapper.class,
+                  mapper -> {
+                    TablePO po = 
POConverters.initializeTablePOWithVersion(tableEntity, builder);
+                    tablePORef.set(po);
+                    if (overwrite) {
+                      mapper.insertTableMetaOnDuplicateKeyUpdate(po);
+                    } else {
+                      mapper.insertTableMeta(po);
+                    }
+                  }),
+          () -> {
+            // We need to delete the columns first if we want to overwrite the 
table.
             if (overwrite) {
-              mapper.insertTableMetaOnDuplicateKeyUpdate(po);
-            } else {
-              mapper.insertTableMeta(po);
+              TableColumnMetaService.getInstance()

Review Comment:
   TableColumnMetaService.getInstance().isColumnUpdated



##########
core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java:
##########
@@ -117,16 +108,34 @@ public void insertTable(TableEntity tableEntity, boolean 
overwrite) throws IOExc
       TablePO.Builder builder = TablePO.builder();
       fillTablePOBuilderParentEntityId(builder, tableEntity.namespace());
 
-      SessionUtils.doWithCommit(
-          TableMetaMapper.class,
-          mapper -> {
-            TablePO po = 
POConverters.initializeTablePOWithVersion(tableEntity, builder);
+      AtomicReference<TablePO> tablePORef = new AtomicReference<>();
+      SessionUtils.doMultipleWithCommit(
+          () ->
+              SessionUtils.doWithoutCommit(
+                  TableMetaMapper.class,
+                  mapper -> {
+                    TablePO po = 
POConverters.initializeTablePOWithVersion(tableEntity, builder);
+                    tablePORef.set(po);
+                    if (overwrite) {
+                      mapper.insertTableMetaOnDuplicateKeyUpdate(po);
+                    } else {
+                      mapper.insertTableMeta(po);
+                    }
+                  }),
+          () -> {
+            // We need to delete the columns first if we want to overwrite the 
table.
             if (overwrite) {
-              mapper.insertTableMetaOnDuplicateKeyUpdate(po);
-            } else {
-              mapper.insertTableMeta(po);
+              TableColumnMetaService.getInstance()

Review Comment:
   Is there a good way to judge if there has been a change in the table so that 
we don't need to drop all columns and create them if no changes have been made 
to them such as table renaming, or altering table comments?



##########
core/src/main/java/org/apache/gravitino/storage/relational/po/ColumnPO.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+
+@EqualsAndHashCode
+@Getter
+public class ColumnPO {
+
+  public enum ColumnOpType {
+    CREATE((byte) 1),
+    UPDATE((byte) 2),
+    DELETE((byte) 3);
+
+    private final Byte value;
+
+    ColumnOpType(Byte value) {
+      this.value = value;
+    }
+
+    public Byte value() {
+      return value;
+    }
+  }
+
+  public enum Nullable {
+    TRUE((byte) 0, true),
+    FALSE((byte) 1, false);
+
+    private final Byte value;
+
+    private final boolean nullable;
+
+    Nullable(Byte value, boolean nullable) {
+      this.value = value;
+      this.nullable = nullable;
+    }
+
+    public Byte value() {
+      return value;
+    }
+
+    public boolean nullable() {
+      return nullable;
+    }
+
+    public static Nullable fromValue(Byte value) {
+      for (Nullable nullable : values()) {
+        if (nullable.value.equals(value)) {
+          return nullable;
+        }
+      }
+      throw new IllegalArgumentException("Invalid nullable value: " + value);
+    }
+
+    public static Nullable fromBoolean(boolean nullable) {
+      for (Nullable nullableEnum : values()) {
+        if (nullableEnum.nullable == nullable) {
+          return nullableEnum;
+        }
+      }
+      throw new IllegalArgumentException("Invalid nullable boolean value: " + 
nullable);
+    }
+  }
+
+  public enum AutoIncrement {
+    TRUE((byte) 0, true),
+    FALSE((byte) 1, false);
+
+    private final Byte value;
+
+    private final boolean autoIncrement;
+
+    AutoIncrement(Byte value, boolean autoIncrement) {
+      this.value = value;
+      this.autoIncrement = autoIncrement;
+    }
+
+    public Byte value() {
+      return value;
+    }
+
+    public boolean autoIncrement() {
+      return autoIncrement;
+    }
+
+    public static AutoIncrement fromValue(Byte value) {

Review Comment:
   Why do we use `Byte` not `byte` here?



##########
core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
+import org.apache.gravitino.storage.relational.po.ColumnPO;
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.gravitino.storage.relational.utils.POConverters;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+
+public class TableColumnMetaService {
+
+  private static final TableColumnMetaService INSTANCE = new 
TableColumnMetaService();
+
+  private TableColumnMetaService() {}
+
+  public static TableColumnMetaService getInstance() {
+    return INSTANCE;
+  }
+
+  public List<ColumnPO> getColumnsByTableIdAndVersion(Long tableId, Long 
version) {
+    List<ColumnPO> columnPOs =
+        SessionUtils.getWithoutCommit(
+            TableColumnMapper.class,
+            mapper -> mapper.listColumnPOsByTableIdAndVersion(tableId, 
version));
+
+    // Filter out the deleted columns
+    return columnPOs.stream()
+        .filter(c -> !Objects.equals(c.getColumnOpType(), 
ColumnPO.ColumnOpType.DELETE.value()))
+        .collect(Collectors.toList());
+  }
+
+  public void insertColumnPOs(TablePO tablePO, List<ColumnEntity> 
columnEntities) {
+    List<ColumnPO> columnPOs =
+        POConverters.initializeColumnPOs(tablePO, columnEntities, 
ColumnPO.ColumnOpType.CREATE);
+    SessionUtils.doWithoutCommit(
+        TableColumnMapper.class, mapper -> mapper.insertColumnPOs(columnPOs));
+  }
+
+  public boolean deleteColumnsByTableId(Long tableId) {
+    Integer result =
+        SessionUtils.doWithCommitAndFetchResult(
+            TableColumnMapper.class, mapper -> 
mapper.softDeleteColumnsByTableId(tableId));
+    return result > 0;
+  }
+
+  public int deleteColumnsByLegacyTimeline(Long legacyTimeline, int limit) {
+    return SessionUtils.doWithoutCommitAndFetchResult(
+        TableColumnMapper.class,
+        mapper -> mapper.deleteColumnPOsByLegacyTimeline(legacyTimeline, 
limit));
+  }
+
+  public boolean isColumnUpdated(TableEntity oldTable, TableEntity newTable) {
+    Map<Long, ColumnEntity> oldColumns =
+        oldTable.columns() == null
+            ? Collections.emptyMap()
+            : oldTable.columns().stream()
+                .collect(Collectors.toMap(ColumnEntity::id, 
Function.identity()));
+
+    Map<Long, ColumnEntity> newColumns =
+        newTable.columns() == null
+            ? Collections.emptyMap()
+            : newTable.columns().stream()
+                .collect(Collectors.toMap(ColumnEntity::id, 
Function.identity()));
+
+    return oldColumns.size() != newColumns.size() || 
!oldColumns.equals(newColumns);
+  }
+
+  public void updateColumnPOsFromTableDiff(
+      TableEntity oldTable, TableEntity newTable, TablePO newTablePO) {
+    Map<Long, ColumnEntity> oldColumns =
+        oldTable.columns() == null
+            ? Collections.emptyMap()
+            : oldTable.columns().stream()
+                .collect(Collectors.toMap(ColumnEntity::id, 
Function.identity()));
+    Map<Long, ColumnEntity> newColumns =
+        newTable.columns() == null
+            ? Collections.emptyMap()
+            : newTable.columns().stream()
+                .collect(Collectors.toMap(ColumnEntity::id, 
Function.identity()));
+
+    List<ColumnPO> columnPOsToInsert = Lists.newArrayList();
+    for (ColumnEntity newColumn : newColumns.values()) {
+      ColumnEntity oldColumn = oldColumns.get(newColumn.id());
+      if (oldColumn == null || !oldColumn.equals(newColumn)) {

Review Comment:
   if `oldColumn == null`, it means the newColumn is a newly created one, so 
the op type should not be update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to