xxubai commented on code in PR #4244:
URL: https://github.com/apache/amoro/pull/4244#discussion_r3419512207


##########
amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java:
##########
@@ -243,37 +244,55 @@ public DataStream<RowData> buildUnkeyedTableSource(String 
scanStartupMode) {
               .properties(properties)
               .flinkConf(flinkConf)
               .limit(limit);
+      Long startSnapshotId = null;
       if 
(MixedFormatValidator.SCAN_STARTUP_MODE_LATEST.equalsIgnoreCase(scanStartupMode))
 {
         Optional<Snapshot> startSnapshotOptional =
             Optional.ofNullable(tableLoader.loadTable().currentSnapshot());
         if (startSnapshotOptional.isPresent()) {
           Snapshot snapshot = startSnapshotOptional.get();
+          startSnapshotId = snapshot.snapshotId();
           LOG.info(
               "Get starting snapshot id {} based on scan startup mode {}",
               snapshot.snapshotId(),
               scanStartupMode);
-          builder.startSnapshotId(snapshot.snapshotId());
+          builder.startSnapshotId(startSnapshotId);
         }
       }
       DataStream<RowData> origin = builder.build();
-      return wrapKrb(origin).assignTimestampsAndWatermarks(watermarkStrategy);
+      return wrapKrb(origin, 
startSnapshotId).assignTimestampsAndWatermarks(watermarkStrategy);
     }
 
     /** extract op from dataStream, and wrap krb support */
-    private DataStream<RowData> wrapKrb(DataStream<RowData> ds) {
+    private DataStream<RowData> wrapKrb(DataStream<RowData> ds, Long 
startSnapshotId) {

Review Comment:
   Done, added `@Nullable` to `wrapKrb(..., startSnapshotId)` since it can 
remain null when `SCAN_STARTUP_MODE_LATEST` does not find a current snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to