wwj6591812 commented on code in PR #4333:
URL: https://github.com/apache/paimon/pull/4333#discussion_r1805938935
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java:
##########
@@ -30,6 +30,8 @@ public class RollbackToActionFactory implements ActionFactory
{
private static final String VERSION = "version";
+ private static final String IS_TIMESTAMP = "is_timestamp";
Review Comment:
You should also modify doc.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java:
##########
@@ -32,14 +33,18 @@ public class RollbackToAction extends TableActionBase {
private final String version;
+ private final Boolean isTimestamp;
Review Comment:
Why not use long timestamp uniformly in flink action && flink procedure &&
spark procedure?
##########
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala:
##########
@@ -68,17 +68,31 @@ class RollbackProcedureTest extends PaimonSparkTestBase
with StreamTest {
inputData.addData((2, "b"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+ val ts = System.currentTimeMillis
// snapshot-3
inputData.addData((2, "b2"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+
+ // snapshot-4
+ inputData.addData((2, "b3"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b3") :: Nil)
+
assertThrows[RuntimeException] {
spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception',
version => '2')")
}
// rollback to snapshot
checkAnswer(
- spark.sql("CALL paimon.sys.rollback(table => 'test.T', version
=> '2')"),
+ spark.sql("CALL paimon.sys.rollback(table => 'test.T', version
=> '3')"),
+ Row(true) :: Nil)
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+
+ // rollback to timestamp
Review Comment:
rollback with timestamp
##########
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala:
##########
@@ -68,17 +68,31 @@ class RollbackProcedureTest extends PaimonSparkTestBase
with StreamTest {
inputData.addData((2, "b"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+ val ts = System.currentTimeMillis
Review Comment:
inline this
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java:
##########
@@ -51,7 +56,16 @@ public void run() throws Exception {
}
if (version.chars().allMatch(Character::isDigit)) {
- table.rollbackTo(Long.parseLong(version));
+ if (isTimestamp != null && isTimestamp) {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ fileStoreTable.rollbackTo(
+ fileStoreTable
+ .snapshotManager()
+
.earlierOrEqualTimeMills(Long.parseLong(version))
+ .id());
Review Comment:
Method invocation 'id' may produce 'NullPointerException'
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java:
##########
@@ -51,7 +56,16 @@ public void run() throws Exception {
}
if (version.chars().allMatch(Character::isDigit)) {
- table.rollbackTo(Long.parseLong(version));
+ if (isTimestamp != null && isTimestamp) {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
Review Comment:
Move line 60 from here to line 57 like spark procedure.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java:
##########
@@ -51,7 +56,16 @@ public void run() throws Exception {
}
if (version.chars().allMatch(Character::isDigit)) {
- table.rollbackTo(Long.parseLong(version));
+ if (isTimestamp != null && isTimestamp) {
Review Comment:
boolean
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java:
##########
@@ -50,16 +51,25 @@ public class RollbackToProcedure extends ProcedureBase {
@ArgumentHint(
name = "snapshot_id",
type = @DataTypeHint("BIGINT"),
- isOptional = true)
+ isOptional = true),
+ @ArgumentHint(name = "timestamp", type =
@DataTypeHint("BIGINT"), isOptional = true)
})
public String[] call(
- ProcedureContext procedureContext, String tableId, String tagName,
Long snapshotId)
+ ProcedureContext procedureContext,
+ String tableId,
+ String tagName,
+ Long snapshotId,
+ Long timestamp)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
if (!StringUtils.isNullOrWhitespaceOnly(tagName)) {
table.rollbackTo(tagName);
- } else {
+ } else if (snapshotId != null) {
table.rollbackTo(snapshotId);
+ } else {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ fileStoreTable.rollbackTo(
+
fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp).id());
Review Comment:
Method invocation 'id' may produce 'NullPointerException'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]