Reo-LEI commented on a change in pull request #3103:
URL: https://github.com/apache/iceberg/pull/3103#discussion_r718183168
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult>
pendingResults, Stri
// merged one will lead to the incorrect delete semantic.
WriteResult result = e.getValue();
RowDelta rowDelta = table.newRowDelta()
+ .validateFromSnapshot(lastCommittedSnapshotId)
Review comment:
@ayush-san I'm very sorry, this is my mistake, I want to linked to
https://github.com/apache/iceberg/issues/3102#issuecomment-919073751, but not
#2867 .
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult>
pendingResults, Stri
// merged one will lead to the incorrect delete semantic.
WriteResult result = e.getValue();
RowDelta rowDelta = table.newRowDelta()
+ .validateFromSnapshot(lastCommittedSnapshotId)
Review comment:
@ayush-san I'm very sorry, this is my mistake, I want to link to
https://github.com/apache/iceberg/issues/3102#issuecomment-919073751, but not
#2867 .
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult>
pendingResults, Stri
// merged one will lead to the incorrect delete semantic.
WriteResult result = e.getValue();
RowDelta rowDelta = table.newRowDelta()
+ .validateFromSnapshot(lastCommittedSnapshotId)
Review comment:
@ayush-san I'm very sorry, this is my mistake, I want to link to
https://github.com/apache/iceberg/issues/3102#issuecomment-919073751, but not
#2867 .
And I think this PR will not solving
https://github.com/apache/iceberg/issues/2482. if the validation error not
fixed, this PR will encounter the same problem. I think we should follow
https://github.com/apache/iceberg/pull/2603#issuecomment-861831900 this to
check the exists files to fix the validation error.
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -283,6 +287,7 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult>
pendingResults, Stri
// merged one will lead to the incorrect delete semantic.
WriteResult result = e.getValue();
RowDelta rowDelta = table.newRowDelta()
+ .validateFromSnapshot(lastCommittedSnapshotId)
Review comment:
> but with your PR we can run the snapshot expire task with the flink
job running since you are updating the lastCommittedSnapshotId.
I think the snapshot expire task don’t cause the validation error is becasue
the `validationHistory` will just travel to the `lastCommittedSnapshot` and
stop valid the removed snapshots which is older than the
`lastCommittedSnapshot`. But once the `lastCommittedSnapshot` be removed(e.g.
the snapshot expire task run multiple time between two checkpoints) , you will
encounter the validation error again.
I think store the `lastCommittedSnapshotId` is not a correct way to resolve
the validation error(#2482). Because we cann't guarantee the
`lastCommittedSnapshot` what we store its snapshot id will alway exists when we
restore the flink job. The `lastCommittedSnapshot` probably has been removed
when we restore the flink job, and the validation error will raise again.
@ayush-san
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]