hililiwei commented on code in PR #7362:
URL: https://github.com/apache/iceberg/pull/7362#discussion_r1191006774
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java:
##########
@@ -137,6 +121,60 @@ static CloseableIterable<CombinedScanTask> planTasks(
}
}
+ private static Long resolveToSnapshotId(Table table, ScanContext context) {
+ if (context.endTag() != null) {
+ Preconditions.checkArgument(
+ table.snapshot(context.endTag()) != null,
+ "Cannot find snapshot with tag %s",
+ context.endTag());
+ return table.snapshot(context.endTag()).snapshotId();
+ }
+
+ if (context.endSnapshotId() != null) {
+ return context.endSnapshotId();
+ }
+
+ if (context.endSnapshotTimestamp() != null) {
+ return SnapshotUtil.snapshotIdAsOfTime(table,
context.endSnapshotTimestamp());
+ }
+ return null;
+ }
+
+ private static Long resolveStartSnapshotId(Table table, ScanContext context)
{
+ if (context.startTag() != null) {
+ Preconditions.checkArgument(
+ table.snapshot(context.startTag()) != null,
+ "Cannot find snapshot with tag %s",
+ context.startTag());
+ return table.snapshot(context.startTag()).snapshotId();
+ }
+
+ if (context.startSnapshotId() != null) {
+ return context.startSnapshotId();
+ }
+
+ if (context.startSnapshotTimestamp() != null) {
+ return getStartSnapshotId(table, context.startSnapshotTimestamp());
+ }
+
+ return null;
+ }
+
+ private static Long getStartSnapshotId(Table table, Long startTimestamp) {
+ Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table,
startTimestamp);
+ Preconditions.checkArgument(
+ oldestSnapshotAfter != null,
+ "Cannot find a snapshot older than %s for table %s",
+ startTimestamp,
+ table.name());
+
+ if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
+ return oldestSnapshotAfter.snapshotId();
+ } else {
+ return oldestSnapshotAfter.parentId();
Review Comment:
`oldestSnapshotAfter.parentId()` may be `null`. For `null`, we will not use
it, see line 94:
```
Long startSnapshotId = resolveStartSnapshotId(table, context);
if (startSnapshotId != null) {
scan = scan.fromSnapshotExclusive(startSnapshotId);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]