This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 4028fc6818 [#7415] fix(core): Use doWithCommitAndFetchResult in
deleteColumnsByLegacyTimeline (#7433)
4028fc6818 is described below
commit 4028fc68184d0a0cff25237adce25ec1ba998902
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 18 17:51:08 2025 -0700
[#7415] fix(core): Use doWithCommitAndFetchResult in
deleteColumnsByLegacyTimeline (#7433)
### What changes were proposed in this pull request?
This PR modifies the deleteColumnsByLegacyTimeline method in the
TableColumnMetaService class, changing the transaction handling from
doWithoutCommitAndFetchResult to doWithCommitAndFetchResult.
### Why are the changes needed?
This change is necessary because the deleteColumnsByLegacyTimeline
method is a standalone operation that needs to commit its transaction
immediately after execution. Using doWithCommitAndFetchResult ensures:
1. The deletion operation executes and commits within its own
transaction, not depending on external transactions
2. Consistency with garbage collection operations for other entity types
(like MetalakeMeta, CatalogMeta, etc.) which also use
doWithCommitAndFetchResult
3. Prevention of database locking issues caused by long-running
transactions
4. Immediate visibility of deletion results to other transactions
Fix: #7415
### Does this PR introduce _any_ user-facing change?
No, this is an improvement to the internal implementation and does not
affect user interfaces or functionality.
### How was this patch tested?
The modified method was verified through the junit test
TestTableColumnMetaService.testDeleteColumnsByLegacyTimeline, which
ensures:
1. The system correctly identifies and deletes columns with a specific
legacy timeline
2. The batch deletion functionality (with limit parameter) works
properly
3. The deletion operation is idempotent, meaning repeated calls do not
cause errors
Co-authored-by: liuxian <[email protected]>
Co-authored-by: liuxian131 <[email protected]>
---
.../relational/service/TableColumnMetaService.java | 3 +-
.../service/TestTableColumnMetaService.java | 119 +++++++++++++++++++++
2 files changed, 120 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
index 1527811fa9..0fb78573be 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
@@ -106,8 +106,7 @@ public class TableColumnMetaService {
}
public int deleteColumnsByLegacyTimeline(Long legacyTimeline, int limit) {
- // deleteColumns will be done in the outside transaction, so we don't do
commit here.
- return SessionUtils.doWithoutCommitAndFetchResult(
+ return SessionUtils.doWithCommitAndFetchResult(
TableColumnMapper.class,
mapper -> mapper.deleteColumnPOsByLegacyTimeline(legacyTimeline,
limit));
}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
index a0e9f3427f..b6e1ef2843 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
@@ -19,7 +19,12 @@
package org.apache.gravitino.storage.relational.service;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -34,7 +39,9 @@ import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
import org.apache.gravitino.storage.relational.po.ColumnPO;
+import org.apache.gravitino.storage.relational.session.SqlSessions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.collect.Lists;
@@ -547,4 +554,116 @@ public class TestTableColumnMetaService extends
TestJDBCBackend {
Assertions.assertEquals(expectedColumn.auditInfo(),
column.auditInfo());
});
}
+
+ @Test
+ public void testDeleteColumnsByLegacyTimeline() throws IOException {
+ String catalogName = "catalog1";
+ String schemaName = "schema1";
+ createParentEntities(METALAKE_NAME, catalogName, schemaName, auditInfo);
+
+ List<ColumnEntity> columns = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ columns.add(
+ ColumnEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("column_" + i)
+ .withPosition(i)
+ .withComment("comment_" + i)
+ .withDataType(Types.StringType.get())
+ .withNullable(true)
+ .withAutoIncrement(false)
+ .withAuditInfo(auditInfo)
+ .build());
+ }
+
+ TableEntity createdTable =
+ TableEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("legacy_table")
+ .withNamespace(Namespace.of(METALAKE_NAME, catalogName,
schemaName))
+ .withColumns(columns)
+ .withAuditInfo(auditInfo)
+ .build();
+
+ TableMetaService.getInstance().insertTable(createdTable, false);
+ long now = System.currentTimeMillis();
+ long legacyTimeline = now - 100000; // Past timestamp
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ try {
+ connection = SqlSessions.getSqlSession().getConnection();
+ for (ColumnEntity column : columns) {
+ String sql =
+ "UPDATE "
+ + TableColumnMapper.COLUMN_TABLE_NAME
+ + " SET deleted_at = ? WHERE column_id = ?";
+ stmt = connection.prepareStatement(sql);
+ stmt.setLong(1, legacyTimeline);
+ stmt.setLong(2, column.id());
+ stmt.executeUpdate();
+ stmt.close();
+ }
+ SqlSessions.commitAndCloseSqlSession();
+ } catch (Exception e) {
+ SqlSessions.rollbackAndCloseSqlSession();
+ throw new IOException("Failed to update column deleted_at timestamp", e);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ }
+ int count = countColumnsByTableId(legacyTimeline);
+ Assertions.assertEquals(5, count, "Should have 5 columns with legacy
timeline");
+ TableColumnMetaService service = TableColumnMetaService.getInstance();
+ service.deleteColumnsByLegacyTimeline(now, 3);
+ count = countColumnsByTableId(legacyTimeline);
+ Assertions.assertEquals(2, count, "Should have 2 columns remaining");
+ service.deleteColumnsByLegacyTimeline(now, 10);
+ count = countColumnsByTableId(legacyTimeline);
+ Assertions.assertEquals(0, count, "Should have no columns remaining");
+ Assertions.assertTrue(
+
MetalakeMetaService.getInstance().deleteMetalake(NameIdentifier.of(METALAKE_NAME),
true));
+ }
+
+ private int countColumnsByTableId(long legacyTimeline) throws IOException {
+ int count = 0;
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+ try {
+ connection = SqlSessions.getSqlSession().getConnection();
+ String sql =
+ "SELECT COUNT(*) FROM " + TableColumnMapper.COLUMN_TABLE_NAME + "
WHERE deleted_at = ?";
+ stmt = connection.prepareStatement(sql);
+ stmt.setLong(1, legacyTimeline);
+ rs = stmt.executeQuery();
+ if (rs.next()) {
+ count = rs.getInt(1);
+ }
+ SqlSessions.commitAndCloseSqlSession();
+ } catch (Exception e) {
+ SqlSessions.rollbackAndCloseSqlSession();
+ throw new IOException("Failed to count columns with legacy timeline", e);
+ } finally {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ }
+ return count;
+ }
}