This is an automated email from the ASF dual-hosted git repository. jinsongzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/amoro.git
commit 4bdd974dff2d3bf5824a470eadd1cb11a02fcf0e Author: Yaguang Jia <[email protected]> AuthorDate: Wed Aug 20 16:01:55 2025 +0800 fix flinksource test --- .../apache/iceberg/flink/source/ScanContext.java | 182 +++++++++++++-------- .../flink/read/source/MixedFormatScanContext.java | 10 +- 2 files changed, 127 insertions(+), 65 deletions(-) diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index c18853647..a5f8b15ec 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common-iceberg-bridge/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -32,8 +32,10 @@ import org.apache.iceberg.flink.FlinkReadOptions; import java.io.Serializable; import java.time.Duration; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Copy from Iceberg {@link ScanContext}. only change line 115 and expand the modifier. Context @@ -173,9 +175,12 @@ public class ScanContext implements Serializable { protected final List<Expression> filters; protected final long limit; protected final boolean includeColumnStats; + protected final Collection<String> includeStatsForColumns; protected final Integer planParallelism; protected final int maxPlanningSnapshotCount; protected final int maxAllowedPlanningFailures; + protected final String watermarkColumn; + protected final TimeUnit watermarkColumnTimeUnit; protected ScanContext( boolean caseSensitive, @@ -195,10 +200,13 @@ public class ScanContext implements Serializable { List<Expression> filters, long limit, boolean includeColumnStats, + Collection<String> includeStatsForColumns, boolean exposeLocality, Integer planParallelism, int maxPlanningSnapshotCount, int maxAllowedPlanningFailures, + String watermarkColumn, + TimeUnit watermarkColumnTimeUnit, String branch, String tag, String startTag, @@ -225,15 +233,16 @@ public class ScanContext implements Serializable { this.filters = filters; this.limit = limit; this.includeColumnStats = includeColumnStats; + this.includeStatsForColumns = includeStatsForColumns; this.exposeLocality = exposeLocality; this.planParallelism = planParallelism; this.maxPlanningSnapshotCount = maxPlanningSnapshotCount; this.maxAllowedPlanningFailures = maxAllowedPlanningFailures; - - validate(); + this.watermarkColumn = watermarkColumn; + this.watermarkColumnTimeUnit = watermarkColumnTimeUnit; } - private void validate() { + void validate() { if (isStreaming) { if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) { Preconditions.checkArgument( @@ -251,14 +260,17 @@ public class ScanContext implements Serializable { startSnapshotId == null, "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); } - Preconditions.checkArgument( - branch == null, - String.format( - "Cannot scan table using ref %s configured for streaming reader yet", branch)); Preconditions.checkArgument( tag == null, String.format("Cannot scan table using ref %s configured for streaming reader", tag)); + Preconditions.checkArgument( + snapshotId == null, "Cannot set snapshot-id option for streaming reader"); + Preconditions.checkArgument( + asOfTimestamp == null, "Cannot set as-of-timestamp option for streaming reader"); + Preconditions.checkArgument( + endSnapshotId == null, "Cannot set end-snapshot-id option for streaming reader"); + Preconditions.checkArgument(endTag == null, "Cannot set end-tag option for streaming reader"); } Preconditions.checkArgument( !(startTag != null && startSnapshotId() != null), @@ -273,107 +285,119 @@ public class ScanContext implements Serializable { "Cannot set maxAllowedPlanningFailures to a negative number other than -1."); } - boolean caseSensitive() { + public boolean caseSensitive() { return caseSensitive; } - Long snapshotId() { + public Long snapshotId() { return snapshotId; } - String branch() { + public String branch() { return branch; } - String tag() { + public String tag() { return tag; } - String startTag() { + public String startTag() { return startTag; } - String endTag() { + public String endTag() { return endTag; } - StreamingStartingStrategy streamingStartingStrategy() { + public StreamingStartingStrategy streamingStartingStrategy() { return startingStrategy; } - Long startSnapshotTimestamp() { + public Long startSnapshotTimestamp() { return startSnapshotTimestamp; } - Long startSnapshotId() { + public Long startSnapshotId() { return startSnapshotId; } - Long endSnapshotId() { + public Long endSnapshotId() { return endSnapshotId; } - Long asOfTimestamp() { + public Long asOfTimestamp() { return asOfTimestamp; } - Long splitSize() { + public Long splitSize() { return splitSize; } - Integer splitLookback() { + public Integer splitLookback() { return splitLookback; } - Long splitOpenFileCost() { + public Long splitOpenFileCost() { return splitOpenFileCost; } - boolean isStreaming() { + public boolean isStreaming() { return isStreaming; } - Duration monitorInterval() { + public Duration monitorInterval() { return monitorInterval; } - String nameMapping() { + public String nameMapping() { return nameMapping; } - Schema project() { + public Schema project() { return schema; } - List<Expression> filters() { + public List<Expression> filters() { return filters; } - long limit() { + public long limit() { return limit; } - boolean includeColumnStats() { + public boolean includeColumnStats() { return includeColumnStats; } - boolean exposeLocality() { + public Collection<String> includeStatsForColumns() { + return includeStatsForColumns; + } + + public boolean exposeLocality() { return exposeLocality; } - Integer planParallelism() { + public Integer planParallelism() { return planParallelism; } - int maxPlanningSnapshotCount() { + public int maxPlanningSnapshotCount() { return maxPlanningSnapshotCount; } - int maxAllowedPlanningFailures() { + public int maxAllowedPlanningFailures() { return maxAllowedPlanningFailures; } - ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) { + public String watermarkColumn() { + return watermarkColumn; + } + + public TimeUnit watermarkColumnTimeUnit() { + return watermarkColumnTimeUnit; + } + + public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) .useSnapshotId(null) @@ -394,19 +418,22 @@ public class ScanContext implements Serializable { .filters(filters) .limit(limit) .includeColumnStats(includeColumnStats) + .includeColumnStats(includeStatsForColumns) .exposeLocality(exposeLocality) .planParallelism(planParallelism) .maxPlanningSnapshotCount(maxPlanningSnapshotCount) .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(watermarkColumn) + .watermarkColumnTimeUnit(watermarkColumnTimeUnit) .build(); } - ScanContext copyWithSnapshotId(long newSnapshotId) { + public ScanContext copyWithSnapshotId(long newSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) .useSnapshotId(newSnapshotId) .useBranch(branch) - .useTag(null) + .useTag(tag) .startSnapshotId(null) .endSnapshotId(null) .startTag(null) @@ -422,18 +449,21 @@ public class ScanContext implements Serializable { .filters(filters) .limit(limit) .includeColumnStats(includeColumnStats) + .includeColumnStats(includeStatsForColumns) .exposeLocality(exposeLocality) .planParallelism(planParallelism) .maxPlanningSnapshotCount(maxPlanningSnapshotCount) .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(watermarkColumn) + .watermarkColumnTimeUnit(watermarkColumnTimeUnit) .build(); } - static Builder builder() { + public static Builder builder() { return new Builder(); } - static class Builder { + public static class Builder { private boolean caseSensitive = FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue(); private Long snapshotId = FlinkReadOptions.SNAPSHOT_ID.defaultValue(); private String branch = FlinkReadOptions.BRANCH.defaultValue(); @@ -458,6 +488,7 @@ public class ScanContext implements Serializable { private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue(); private boolean includeColumnStats = FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue(); + private Collection<String> includeStatsForColumns = null; private boolean exposeLocality; private Integer planParallelism = FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue(); @@ -465,135 +496,153 @@ public class ScanContext implements Serializable { FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue(); private int maxAllowedPlanningFailures = FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue(); + private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue(); + private TimeUnit watermarkColumnTimeUnit = + FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue(); private Builder() {} - Builder caseSensitive(boolean newCaseSensitive) { + public Builder caseSensitive(boolean newCaseSensitive) { this.caseSensitive = newCaseSensitive; return this; } - Builder useSnapshotId(Long newSnapshotId) { + public Builder useSnapshotId(Long newSnapshotId) { this.snapshotId = newSnapshotId; return this; } - Builder useTag(String newTag) { + public Builder useTag(String newTag) { this.tag = newTag; return this; } - Builder useBranch(String newBranch) { + public Builder useBranch(String newBranch) { this.branch = newBranch; return this; } - Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) { + public Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) { this.startingStrategy = newStartingStrategy; return this; } - Builder startSnapshotTimestamp(Long newStartSnapshotTimestamp) { + public Builder startSnapshotTimestamp(Long newStartSnapshotTimestamp) { this.startSnapshotTimestamp = newStartSnapshotTimestamp; return this; } - Builder startSnapshotId(Long newStartSnapshotId) { + public Builder startSnapshotId(Long newStartSnapshotId) { this.startSnapshotId = newStartSnapshotId; return this; } - Builder endSnapshotId(Long newEndSnapshotId) { + public Builder endSnapshotId(Long newEndSnapshotId) { this.endSnapshotId = newEndSnapshotId; return this; } - Builder startTag(String newStartTag) { + public Builder startTag(String newStartTag) { this.startTag = newStartTag; return this; } - Builder endTag(String newEndTag) { + public Builder endTag(String newEndTag) { this.endTag = newEndTag; return this; } - Builder asOfTimestamp(Long newAsOfTimestamp) { + public Builder asOfTimestamp(Long newAsOfTimestamp) { this.asOfTimestamp = newAsOfTimestamp; return this; } - Builder splitSize(Long newSplitSize) { + public Builder splitSize(Long newSplitSize) { this.splitSize = newSplitSize; return this; } - Builder splitLookback(Integer newSplitLookback) { + public Builder splitLookback(Integer newSplitLookback) { this.splitLookback = newSplitLookback; return this; } - Builder splitOpenFileCost(Long newSplitOpenFileCost) { + public Builder splitOpenFileCost(Long newSplitOpenFileCost) { this.splitOpenFileCost = newSplitOpenFileCost; return this; } - Builder streaming(boolean streaming) { + public Builder streaming(boolean streaming) { this.isStreaming = streaming; return this; } - Builder monitorInterval(Duration newMonitorInterval) { + public Builder monitorInterval(Duration newMonitorInterval) { this.monitorInterval = newMonitorInterval; return this; } - Builder nameMapping(String newNameMapping) { + public Builder nameMapping(String newNameMapping) { this.nameMapping = newNameMapping; return this; } - Builder project(Schema newProjectedSchema) { + public Builder project(Schema newProjectedSchema) { this.projectedSchema = newProjectedSchema; return this; } - Builder filters(List<Expression> newFilters) { + public Builder filters(List<Expression> newFilters) { this.filters = newFilters; return this; } - Builder limit(long newLimit) { + public Builder limit(long newLimit) { this.limit = newLimit; return this; } - Builder includeColumnStats(boolean newIncludeColumnStats) { + public Builder includeColumnStats(boolean newIncludeColumnStats) { this.includeColumnStats = newIncludeColumnStats; return this; } - Builder exposeLocality(boolean newExposeLocality) { + public Builder includeColumnStats(Collection<String> newIncludeStatsForColumns) { + this.includeStatsForColumns = newIncludeStatsForColumns; + return this; + } + + public Builder exposeLocality(boolean newExposeLocality) { this.exposeLocality = newExposeLocality; return this; } - Builder planParallelism(Integer parallelism) { + public Builder planParallelism(Integer parallelism) { this.planParallelism = parallelism; return this; } - Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) { + public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) { this.maxPlanningSnapshotCount = newMaxPlanningSnapshotCount; return this; } - Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) { + public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) { this.maxAllowedPlanningFailures = newMaxAllowedPlanningFailures; return this; } - Builder resolveConfig( + public Builder watermarkColumn(String newWatermarkColumn) { + this.watermarkColumn = newWatermarkColumn; + return this; + } + + public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) { + this.watermarkColumnTimeUnit = newWatermarkTimeUnit; + return this; + } + + public Builder resolveConfig( Table table, Map<String, String> readOptions, ReadableConfig readableConfig) { FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig); @@ -618,7 +667,9 @@ public class ScanContext implements Serializable { .planParallelism(flinkReadConf.workerPoolSize()) .includeColumnStats(flinkReadConf.includeColumnStats()) .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount()) - .maxAllowedPlanningFailures(maxAllowedPlanningFailures); + .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(flinkReadConf.watermarkColumn()) + .watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit()); } public ScanContext build() { @@ -640,10 +691,13 @@ public class ScanContext implements Serializable { filters, limit, includeColumnStats, + includeStatsForColumns, exposeLocality, planParallelism, maxPlanningSnapshotCount, maxAllowedPlanningFailures, + watermarkColumn, + watermarkColumnTimeUnit, branch, tag, startTag, diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java index 7fc850df0..a0afc9571 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/source/MixedFormatScanContext.java @@ -33,9 +33,11 @@ import org.apache.iceberg.flink.source.StreamingStartingStrategy; import java.io.Serializable; import java.time.Duration; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; /** This is an mixed-format source scan context. */ public class MixedFormatScanContext extends ScanContext implements Serializable { @@ -64,10 +66,13 @@ public class MixedFormatScanContext extends ScanContext implements Serializable builder.filters, builder.limit, builder.includeColumnStats, + builder.includeStatsForColumns, builder.exposeLocality, builder.planParallelism, builder.maxPlanningSnapshotCount, builder.maxAllowedPlanningFailures, + builder.watermarkColumn, + builder.watermarkColumnTimeUnit, builder.branch, builder.tag, builder.startTag, @@ -166,6 +171,7 @@ public class MixedFormatScanContext extends ScanContext implements Serializable private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue(); private boolean includeColumnStats = FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue(); + private Collection<String> includeStatsForColumns = null; private boolean exposeLocality; private Integer planParallelism = FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue(); @@ -173,7 +179,9 @@ public class MixedFormatScanContext extends ScanContext implements Serializable private int maxAllowedPlanningFailures = FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue(); - + private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue(); + private TimeUnit watermarkColumnTimeUnit = + FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue(); private String branch = FlinkReadOptions.BRANCH.defaultValue(); private String tag = FlinkReadOptions.TAG.defaultValue();
