openinx commented on a change in pull request #1793:
URL: https://github.com/apache/iceberg/pull/1793#discussion_r553768680
##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
##########
@@ -100,126 +96,224 @@ private ScanContext(boolean caseSensitive, Long
snapshotId, Long startSnapshotId
this.splitSize = splitSize;
this.splitLookback = splitLookback;
this.splitOpenFileCost = splitOpenFileCost;
+ this.isStreaming = isStreaming;
+ this.monitorInterval = monitorInterval;
+
this.nameMapping = nameMapping;
- this.projectedSchema = projectedSchema;
- this.filterExpressions = filterExpressions;
+ this.schema = schema;
+ this.filters = filters;
this.limit = limit;
}
- ScanContext fromProperties(Map<String, String> properties) {
- Configuration config = new Configuration();
- properties.forEach(config::setString);
- return new ScanContext(config.get(CASE_SENSITIVE),
config.get(SNAPSHOT_ID), config.get(START_SNAPSHOT_ID),
- config.get(END_SNAPSHOT_ID), config.get(AS_OF_TIMESTAMP),
config.get(SPLIT_SIZE), config.get(SPLIT_LOOKBACK),
- config.get(SPLIT_FILE_OPEN_COST),
properties.get(DEFAULT_NAME_MAPPING), projectedSchema, filterExpressions,
- limit);
- }
-
boolean caseSensitive() {
return caseSensitive;
}
- ScanContext setCaseSensitive(boolean isCaseSensitive) {
- return new ScanContext(isCaseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions, limit);
- }
-
Long snapshotId() {
return snapshotId;
}
- ScanContext useSnapshotId(Long scanSnapshotId) {
- return new ScanContext(caseSensitive, scanSnapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions, limit);
- }
-
Long startSnapshotId() {
return startSnapshotId;
}
- ScanContext startSnapshotId(Long id) {
- return new ScanContext(caseSensitive, snapshotId, id, endSnapshotId,
asOfTimestamp, splitSize, splitLookback,
- splitOpenFileCost, nameMapping, projectedSchema, filterExpressions,
limit);
- }
-
Long endSnapshotId() {
return endSnapshotId;
}
- ScanContext endSnapshotId(Long id) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, id,
asOfTimestamp, splitSize, splitLookback,
- splitOpenFileCost, nameMapping, projectedSchema, filterExpressions,
limit);
- }
-
Long asOfTimestamp() {
return asOfTimestamp;
}
- ScanContext asOfTimestamp(Long timestamp) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, timestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions, limit);
- }
-
Long splitSize() {
return splitSize;
}
- ScanContext splitSize(Long size) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, size,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions, limit);
- }
-
Integer splitLookback() {
return splitLookback;
}
- ScanContext splitLookback(Integer lookback) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- lookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions, limit);
- }
-
Long splitOpenFileCost() {
return splitOpenFileCost;
}
- ScanContext splitOpenFileCost(Long fileCost) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, fileCost, nameMapping, projectedSchema,
filterExpressions, limit);
+ boolean isStreaming() {
+ return isStreaming;
+ }
+
+ Duration monitorInterval() {
+ return monitorInterval;
}
String nameMapping() {
return nameMapping;
}
- ScanContext nameMapping(String mapping) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, mapping, projectedSchema,
filterExpressions, limit);
+ Schema project() {
+ return schema;
}
- Schema projectedSchema() {
- return projectedSchema;
+ List<Expression> filters() {
+ return filters;
}
- ScanContext project(Schema schema) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, schema,
filterExpressions, limit);
+ long limit() {
+ return limit;
}
- List<Expression> filterExpressions() {
- return filterExpressions;
+ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long
newEndSnapshotId) {
+ return ScanContext.builder()
+ .caseSensitive(caseSensitive)
+ .useSnapshotId(null)
+ .startSnapshotId(newStartSnapshotId)
+ .endSnapshotId(newEndSnapshotId)
+ .asOfTimestamp(null)
+ .splitSize(splitSize)
+ .splitLookback(splitLookback)
+ .splitOpenFileCost(splitOpenFileCost)
+ .streaming(isStreaming)
+ .monitorInterval(monitorInterval)
+ .nameMapping(nameMapping)
+ .project(schema)
+ .filters(filters)
+ .limit(limit)
+ .build();
}
- ScanContext filterRows(List<Expression> filters) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filters, limit);
+ ScanContext copyWithSnapshotId(long newSnapshotId) {
+ return ScanContext.builder()
+ .caseSensitive(caseSensitive)
+ .useSnapshotId(newSnapshotId)
+ .startSnapshotId(null)
+ .endSnapshotId(null)
+ .asOfTimestamp(null)
+ .splitSize(splitSize)
+ .splitLookback(splitLookback)
+ .splitOpenFileCost(splitOpenFileCost)
+ .streaming(isStreaming)
+ .monitorInterval(monitorInterval)
+ .nameMapping(nameMapping)
+ .project(schema)
+ .filters(filters)
+ .limit(limit)
+ .build();
}
- long limit() {
- return limit;
+ static Builder builder() {
+ return new Builder();
}
- ScanContext limit(Long newLimit) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions, newLimit);
+ static class Builder {
+ private boolean caseSensitive = CASE_SENSITIVE.defaultValue();
+ private Long snapshotId = SNAPSHOT_ID.defaultValue();
+ private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue();
+ private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue();
+ private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue();
+ private Long splitSize = SPLIT_SIZE.defaultValue();
+ private Integer splitLookback = SPLIT_LOOKBACK.defaultValue();
+ private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue();
+ private boolean isStreaming = STREAMING.defaultValue();
+ private Duration monitorInterval = MONITOR_INTERVAL.defaultValue();
+ private String nameMapping;
+ private Schema projectedSchema;
+ private List<Expression> filters;
+ private Long limit = -1L;
+
+ private Builder() {
+ }
+
+ Builder caseSensitive(boolean newCaseSensitive) {
+ this.caseSensitive = newCaseSensitive;
+ return this;
+ }
+
+ Builder useSnapshotId(Long newSnapshotId) {
+ this.snapshotId = newSnapshotId;
+ return this;
+ }
+
+ Builder startSnapshotId(Long newStartSnapshotId) {
+ this.startSnapshotId = newStartSnapshotId;
+ return this;
+ }
+
+ Builder endSnapshotId(Long newEndSnapshotId) {
+ this.endSnapshotId = newEndSnapshotId;
+ return this;
+ }
+
+ Builder asOfTimestamp(Long newAsOfTimestamp) {
+ this.asOfTimestamp = newAsOfTimestamp;
+ return this;
+ }
+
+ Builder splitSize(Long newSplitSize) {
+ this.splitSize = newSplitSize;
+ return this;
+ }
+
+ Builder splitLookback(Integer newSplitLookback) {
+ this.splitLookback = newSplitLookback;
+ return this;
+ }
+
+ Builder splitOpenFileCost(Long newSplitOpenFileCost) {
+ this.splitOpenFileCost = newSplitOpenFileCost;
+ return this;
+ }
+
+ Builder streaming(boolean streaming) {
+ this.isStreaming = streaming;
+ return this;
+ }
+
+ Builder monitorInterval(Duration newMonitorInterval) {
+ this.monitorInterval = newMonitorInterval;
+ return this;
+ }
+
+ Builder nameMapping(String newNameMapping) {
+ this.nameMapping = newNameMapping;
+ return this;
+ }
+
+ Builder project(Schema newProjectedSchema) {
+ this.projectedSchema = newProjectedSchema;
+ return this;
+ }
+
+ Builder filters(List<Expression> newFilters) {
+ this.filters = newFilters;
+ return this;
+ }
+
+ Builder limit(long newLimit) {
+ this.limit = newLimit;
+ return this;
+ }
+
+ Builder fromProperties(Map<String, String> properties) {
+ Configuration config = new Configuration();
+ properties.forEach(config::setString);
+
+ return this.useSnapshotId(config.get(SNAPSHOT_ID))
+ .caseSensitive(config.get(CASE_SENSITIVE))
+ .asOfTimestamp(config.get(AS_OF_TIMESTAMP))
Review comment:
The default value of a given configure key is provided in the
`AS_OF_TIMESTAMP`
[ConfigOption](https://github.com/apache/iceberg/pull/1793/files#diff-91c6e1cb47eb9d762b9ce562657f47924c1b46365d981f268989dfffa11ce63fR48).
For those keys:
1. snapshot-id
2. as-of-timestamp
3. start-snapshot-id
4. end-snapshot-id
5. split-size
6. split-lookback
7. split-file-open-cost
We provides `null` as their default values, so it's better to use boxed
objects here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]