This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git

commit 4bdd974dff2d3bf5824a470eadd1cb11a02fcf0e
Author: Yaguang Jia <[email protected]>
AuthorDate: Wed Aug 20 16:01:55 2025 +0800

    fix flinksource test
---
 .../apache/iceberg/flink/source/ScanContext.java   | 182 +++++++++++++--------
 .../flink/read/source/MixedFormatScanContext.java  |  10 +-
 2 files changed, 127 insertions(+), 65 deletions(-)

diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index c18853647..a5f8b15ec 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -32,8 +32,10 @@ import org.apache.iceberg.flink.FlinkReadOptions;
 
 import java.io.Serializable;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Copy from Iceberg {@link ScanContext}. only change line 115 and expand the 
modifier. Context
@@ -173,9 +175,12 @@ public class ScanContext implements Serializable {
   protected final List<Expression> filters;
   protected final long limit;
   protected final boolean includeColumnStats;
+  protected final Collection<String> includeStatsForColumns;
   protected final Integer planParallelism;
   protected final int maxPlanningSnapshotCount;
   protected final int maxAllowedPlanningFailures;
+  protected final String watermarkColumn;
+  protected final TimeUnit watermarkColumnTimeUnit;
 
   protected ScanContext(
       boolean caseSensitive,
@@ -195,10 +200,13 @@ public class ScanContext implements Serializable {
       List<Expression> filters,
       long limit,
       boolean includeColumnStats,
+      Collection<String> includeStatsForColumns,
       boolean exposeLocality,
       Integer planParallelism,
       int maxPlanningSnapshotCount,
       int maxAllowedPlanningFailures,
+      String watermarkColumn,
+      TimeUnit watermarkColumnTimeUnit,
       String branch,
       String tag,
       String startTag,
@@ -225,15 +233,16 @@ public class ScanContext implements Serializable {
     this.filters = filters;
     this.limit = limit;
     this.includeColumnStats = includeColumnStats;
+    this.includeStatsForColumns = includeStatsForColumns;
     this.exposeLocality = exposeLocality;
     this.planParallelism = planParallelism;
     this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
     this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
-
-    validate();
+    this.watermarkColumn = watermarkColumn;
+    this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;
   }
 
-  private void validate() {
+  void validate() {
     if (isStreaming) {
       if (startingStrategy == 
StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) {
         Preconditions.checkArgument(
@@ -251,14 +260,17 @@ public class ScanContext implements Serializable {
             startSnapshotId == null,
             "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID 
strategy: not null");
       }
-      Preconditions.checkArgument(
-          branch == null,
-          String.format(
-              "Cannot scan table using ref %s configured for streaming reader 
yet", branch));
 
       Preconditions.checkArgument(
           tag == null,
           String.format("Cannot scan table using ref %s configured for 
streaming reader", tag));
+      Preconditions.checkArgument(
+              snapshotId == null, "Cannot set snapshot-id option for streaming 
reader");
+      Preconditions.checkArgument(
+              asOfTimestamp == null, "Cannot set as-of-timestamp option for 
streaming reader");
+      Preconditions.checkArgument(
+              endSnapshotId == null, "Cannot set end-snapshot-id option for 
streaming reader");
+      Preconditions.checkArgument(endTag == null, "Cannot set end-tag option 
for streaming reader");
     }
     Preconditions.checkArgument(
         !(startTag != null && startSnapshotId() != null),
@@ -273,107 +285,119 @@ public class ScanContext implements Serializable {
         "Cannot set maxAllowedPlanningFailures to a negative number other than 
-1.");
   }
 
-  boolean caseSensitive() {
+  public boolean caseSensitive() {
     return caseSensitive;
   }
 
-  Long snapshotId() {
+  public Long snapshotId() {
     return snapshotId;
   }
 
-  String branch() {
+  public String branch() {
     return branch;
   }
 
-  String tag() {
+  public String tag() {
     return tag;
   }
 
-  String startTag() {
+  public String startTag() {
     return startTag;
   }
 
-  String endTag() {
+  public String endTag() {
     return endTag;
   }
 
-  StreamingStartingStrategy streamingStartingStrategy() {
+  public StreamingStartingStrategy streamingStartingStrategy() {
     return startingStrategy;
   }
 
-  Long startSnapshotTimestamp() {
+  public Long startSnapshotTimestamp() {
     return startSnapshotTimestamp;
   }
 
-  Long startSnapshotId() {
+  public Long startSnapshotId() {
     return startSnapshotId;
   }
 
-  Long endSnapshotId() {
+  public Long endSnapshotId() {
     return endSnapshotId;
   }
 
-  Long asOfTimestamp() {
+  public Long asOfTimestamp() {
     return asOfTimestamp;
   }
 
-  Long splitSize() {
+  public Long splitSize() {
     return splitSize;
   }
 
-  Integer splitLookback() {
+  public Integer splitLookback() {
     return splitLookback;
   }
 
-  Long splitOpenFileCost() {
+  public Long splitOpenFileCost() {
     return splitOpenFileCost;
   }
 
-  boolean isStreaming() {
+  public boolean isStreaming() {
     return isStreaming;
   }
 
-  Duration monitorInterval() {
+  public Duration monitorInterval() {
     return monitorInterval;
   }
 
-  String nameMapping() {
+  public String nameMapping() {
     return nameMapping;
   }
 
-  Schema project() {
+  public Schema project() {
     return schema;
   }
 
-  List<Expression> filters() {
+  public List<Expression> filters() {
     return filters;
   }
 
-  long limit() {
+  public long limit() {
     return limit;
   }
 
-  boolean includeColumnStats() {
+  public boolean includeColumnStats() {
     return includeColumnStats;
   }
 
-  boolean exposeLocality() {
+  public Collection<String> includeStatsForColumns() {
+    return includeStatsForColumns;
+  }
+
+  public boolean exposeLocality() {
     return exposeLocality;
   }
 
-  Integer planParallelism() {
+  public Integer planParallelism() {
     return planParallelism;
   }
 
-  int maxPlanningSnapshotCount() {
+  public int maxPlanningSnapshotCount() {
     return maxPlanningSnapshotCount;
   }
 
-  int maxAllowedPlanningFailures() {
+  public int maxAllowedPlanningFailures() {
     return maxAllowedPlanningFailures;
   }
 
-  ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long 
newEndSnapshotId) {
+  public String watermarkColumn() {
+    return watermarkColumn;
+  }
+
+  public TimeUnit watermarkColumnTimeUnit() {
+    return watermarkColumnTimeUnit;
+  }
+
+  public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long 
newEndSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
         .useSnapshotId(null)
@@ -394,19 +418,22 @@ public class ScanContext implements Serializable {
         .filters(filters)
         .limit(limit)
         .includeColumnStats(includeColumnStats)
+        .includeColumnStats(includeStatsForColumns)
         .exposeLocality(exposeLocality)
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
         .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+        .watermarkColumn(watermarkColumn)
+        .watermarkColumnTimeUnit(watermarkColumnTimeUnit)
         .build();
   }
 
-  ScanContext copyWithSnapshotId(long newSnapshotId) {
+  public ScanContext copyWithSnapshotId(long newSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
         .useSnapshotId(newSnapshotId)
         .useBranch(branch)
-        .useTag(null)
+        .useTag(tag)
         .startSnapshotId(null)
         .endSnapshotId(null)
         .startTag(null)
@@ -422,18 +449,21 @@ public class ScanContext implements Serializable {
         .filters(filters)
         .limit(limit)
         .includeColumnStats(includeColumnStats)
+        .includeColumnStats(includeStatsForColumns)
         .exposeLocality(exposeLocality)
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
         .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+        .watermarkColumn(watermarkColumn)
+        .watermarkColumnTimeUnit(watermarkColumnTimeUnit)
         .build();
   }
 
-  static Builder builder() {
+  public static Builder builder() {
     return new Builder();
   }
 
-  static class Builder {
+  public static class Builder {
     private boolean caseSensitive = 
FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue();
     private Long snapshotId = FlinkReadOptions.SNAPSHOT_ID.defaultValue();
     private String branch = FlinkReadOptions.BRANCH.defaultValue();
@@ -458,6 +488,7 @@ public class ScanContext implements Serializable {
     private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue();
     private boolean includeColumnStats =
         FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue();
+    private Collection<String> includeStatsForColumns = null;
     private boolean exposeLocality;
     private Integer planParallelism =
         FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
@@ -465,135 +496,153 @@ public class ScanContext implements Serializable {
         FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
     private int maxAllowedPlanningFailures =
         FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
+    private String watermarkColumn = 
FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
+    private TimeUnit watermarkColumnTimeUnit =
+            FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();
 
     private Builder() {}
 
-    Builder caseSensitive(boolean newCaseSensitive) {
+    public Builder caseSensitive(boolean newCaseSensitive) {
       this.caseSensitive = newCaseSensitive;
       return this;
     }
 
-    Builder useSnapshotId(Long newSnapshotId) {
+    public Builder useSnapshotId(Long newSnapshotId) {
       this.snapshotId = newSnapshotId;
       return this;
     }
 
-    Builder useTag(String newTag) {
+    public Builder useTag(String newTag) {
       this.tag = newTag;
       return this;
     }
 
-    Builder useBranch(String newBranch) {
+    public Builder useBranch(String newBranch) {
       this.branch = newBranch;
       return this;
     }
 
-    Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) {
+    public Builder startingStrategy(StreamingStartingStrategy 
newStartingStrategy) {
       this.startingStrategy = newStartingStrategy;
       return this;
     }
 
-    Builder startSnapshotTimestamp(Long newStartSnapshotTimestamp) {
+    public Builder startSnapshotTimestamp(Long newStartSnapshotTimestamp) {
       this.startSnapshotTimestamp = newStartSnapshotTimestamp;
       return this;
     }
 
-    Builder startSnapshotId(Long newStartSnapshotId) {
+    public Builder startSnapshotId(Long newStartSnapshotId) {
       this.startSnapshotId = newStartSnapshotId;
       return this;
     }
 
-    Builder endSnapshotId(Long newEndSnapshotId) {
+    public Builder endSnapshotId(Long newEndSnapshotId) {
       this.endSnapshotId = newEndSnapshotId;
       return this;
     }
 
-    Builder startTag(String newStartTag) {
+    public Builder startTag(String newStartTag) {
       this.startTag = newStartTag;
       return this;
     }
 
-    Builder endTag(String newEndTag) {
+    public Builder endTag(String newEndTag) {
       this.endTag = newEndTag;
       return this;
     }
 
-    Builder asOfTimestamp(Long newAsOfTimestamp) {
+    public Builder asOfTimestamp(Long newAsOfTimestamp) {
       this.asOfTimestamp = newAsOfTimestamp;
       return this;
     }
 
-    Builder splitSize(Long newSplitSize) {
+    public Builder splitSize(Long newSplitSize) {
       this.splitSize = newSplitSize;
       return this;
     }
 
-    Builder splitLookback(Integer newSplitLookback) {
+    public Builder splitLookback(Integer newSplitLookback) {
       this.splitLookback = newSplitLookback;
       return this;
     }
 
-    Builder splitOpenFileCost(Long newSplitOpenFileCost) {
+    public Builder splitOpenFileCost(Long newSplitOpenFileCost) {
       this.splitOpenFileCost = newSplitOpenFileCost;
       return this;
     }
 
-    Builder streaming(boolean streaming) {
+    public Builder streaming(boolean streaming) {
       this.isStreaming = streaming;
       return this;
     }
 
-    Builder monitorInterval(Duration newMonitorInterval) {
+    public Builder monitorInterval(Duration newMonitorInterval) {
       this.monitorInterval = newMonitorInterval;
       return this;
     }
 
-    Builder nameMapping(String newNameMapping) {
+    public Builder nameMapping(String newNameMapping) {
       this.nameMapping = newNameMapping;
       return this;
     }
 
-    Builder project(Schema newProjectedSchema) {
+    public Builder project(Schema newProjectedSchema) {
       this.projectedSchema = newProjectedSchema;
       return this;
     }
 
-    Builder filters(List<Expression> newFilters) {
+    public Builder filters(List<Expression> newFilters) {
       this.filters = newFilters;
       return this;
     }
 
-    Builder limit(long newLimit) {
+    public Builder limit(long newLimit) {
       this.limit = newLimit;
       return this;
     }
 
-    Builder includeColumnStats(boolean newIncludeColumnStats) {
+    public Builder includeColumnStats(boolean newIncludeColumnStats) {
       this.includeColumnStats = newIncludeColumnStats;
       return this;
     }
 
-    Builder exposeLocality(boolean newExposeLocality) {
+    public Builder includeColumnStats(Collection<String> 
newIncludeStatsForColumns) {
+      this.includeStatsForColumns = newIncludeStatsForColumns;
+      return this;
+    }
+
+    public Builder exposeLocality(boolean newExposeLocality) {
       this.exposeLocality = newExposeLocality;
       return this;
     }
 
-    Builder planParallelism(Integer parallelism) {
+    public Builder planParallelism(Integer parallelism) {
       this.planParallelism = parallelism;
       return this;
     }
 
-    Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
+    public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
       this.maxPlanningSnapshotCount = newMaxPlanningSnapshotCount;
       return this;
     }
 
-    Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
+    public Builder maxAllowedPlanningFailures(int 
newMaxAllowedPlanningFailures) {
       this.maxAllowedPlanningFailures = newMaxAllowedPlanningFailures;
       return this;
     }
 
-    Builder resolveConfig(
+    public Builder watermarkColumn(String newWatermarkColumn) {
+      this.watermarkColumn = newWatermarkColumn;
+      return this;
+    }
+
+    public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
+      this.watermarkColumnTimeUnit = newWatermarkTimeUnit;
+      return this;
+    }
+
+    public Builder resolveConfig(
         Table table, Map<String, String> readOptions, ReadableConfig 
readableConfig) {
       FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, 
readableConfig);
 
@@ -618,7 +667,9 @@ public class ScanContext implements Serializable {
           .planParallelism(flinkReadConf.workerPoolSize())
           .includeColumnStats(flinkReadConf.includeColumnStats())
           .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
-          .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
+          .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
+          .watermarkColumn(flinkReadConf.watermarkColumn())
+          .watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit());
     }
 
     public ScanContext build() {
@@ -640,10 +691,13 @@ public class ScanContext implements Serializable {
           filters,
           limit,
           includeColumnStats,
+          includeStatsForColumns,
           exposeLocality,
           planParallelism,
           maxPlanningSnapshotCount,
           maxAllowedPlanningFailures,
+          watermarkColumn,
+          watermarkColumnTimeUnit,
           branch,
           tag,
           startTag,
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java
index 7fc850df0..a0afc9571 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java
@@ -33,9 +33,11 @@ import 
org.apache.iceberg.flink.source.StreamingStartingStrategy;
 
 import java.io.Serializable;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 /** This is an mixed-format source scan context. */
 public class MixedFormatScanContext extends ScanContext implements 
Serializable {
@@ -64,10 +66,13 @@ public class MixedFormatScanContext extends ScanContext 
implements Serializable
         builder.filters,
         builder.limit,
         builder.includeColumnStats,
+        builder.includeStatsForColumns,
         builder.exposeLocality,
         builder.planParallelism,
         builder.maxPlanningSnapshotCount,
         builder.maxAllowedPlanningFailures,
+        builder.watermarkColumn,
+        builder.watermarkColumnTimeUnit,
         builder.branch,
         builder.tag,
         builder.startTag,
@@ -166,6 +171,7 @@ public class MixedFormatScanContext extends ScanContext 
implements Serializable
     private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue();
     private boolean includeColumnStats =
         FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue();
+    private Collection<String> includeStatsForColumns = null;
     private boolean exposeLocality;
     private Integer planParallelism =
         FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
@@ -173,7 +179,9 @@ public class MixedFormatScanContext extends ScanContext 
implements Serializable
 
     private int maxAllowedPlanningFailures =
         FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
-
+    private String watermarkColumn = 
FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
+    private TimeUnit watermarkColumnTimeUnit =
+            FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();
     private String branch = FlinkReadOptions.BRANCH.defaultValue();
 
     private String tag = FlinkReadOptions.TAG.defaultValue();

Reply via email to