This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e1669ce021 [core] Clear cache when deleting the snapshot (#4966)
e1669ce021 is described below
commit e1669ce02173c7a676386770fc2216838ee29500
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jan 21 10:19:46 2025 +0800
[core] Clear cache when deleting the snapshot (#4966)
---
.../main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java | 2 +-
.../src/main/java/org/apache/paimon/table/RollbackHelper.java | 2 +-
.../src/main/java/org/apache/paimon/utils/SnapshotManager.java | 8 ++++++++
.../java/org/apache/paimon/flink/clone/SnapshotHintOperator.java | 4 +---
.../org/apache/paimon/spark/procedure/RollbackProcedureTest.scala | 8 ++++++++
5 files changed, 19 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index dc1c2d6bdb..9f67a637ff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -252,7 +252,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots
{
if (expireConfig.isChangelogDecoupled()) {
commitChangelog(new Changelog(snapshot));
}
-
snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id));
+ snapshotManager.deleteSnapshot(id);
}
writeEarliestHint(endExclusiveId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index 29fecec113..d5482c6f53 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -122,7 +122,7 @@ public class RollbackHelper {
// Ignore the non-existent snapshots
if (snapshotManager.snapshotExists(i)) {
toBeCleaned.add(snapshotManager.snapshot(i));
- fileIO.deleteQuietly(snapshotManager.snapshotPath(i));
+ snapshotManager.deleteSnapshot(i);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index ae70d7aec5..5257cf1c12 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -183,6 +183,14 @@ public class SnapshotManager implements Serializable {
}
}
+ public void deleteSnapshot(long snapshotId) {
+ Path path = snapshotPath(snapshotId);
+ if (cache != null) {
+ cache.invalidate(path);
+ }
+ fileIO().deleteQuietly(path);
+ }
+
public boolean longLivedChangelogExists(long snapshotId) {
Path path = longLivedChangelogPath(snapshotId);
try {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
index 938119f947..96a9e363bf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
@@ -84,9 +84,7 @@ public class SnapshotHintOperator extends
AbstractStreamOperator<CloneFileInfo>
targetTableSnapshotManager.commitLatestHint(snapshotId);
for (Snapshot snapshot :
targetTableSnapshotManager.safelyGetAllSnapshots()) {
if (snapshot.id() != snapshotId) {
- targetTableSnapshotManager
- .fileIO()
-
.deleteQuietly(targetTableSnapshotManager.snapshotPath(snapshot.id()));
+ targetTableSnapshotManager.deleteSnapshot(snapshot.id());
}
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
index 457c5ba513..5f5facc57a 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
@@ -151,4 +151,12 @@ class RollbackProcedureTest extends PaimonSparkTestBase
with StreamTest {
}
}
+ test("Paimon Procedure: rollback with cache") {
+ sql("CREATE TABLE T (id INT)")
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)")
+ sql("DELETE FROM T WHERE id = 1")
+ sql("CALL sys.rollback(table => 'T', version => '1')")
+ sql("DELETE FROM T WHERE id = 1")
+ checkAnswer(sql("SELECT * FROM T ORDER BY id"), Seq(Row(2), Row(3),
Row(4)))
+ }
}