rdblue commented on a change in pull request #1793:
URL: https://github.com/apache/iceberg/pull/1793#discussion_r529932424
##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
##########
@@ -98,115 +112,203 @@ private ScanContext(boolean caseSensitive, Long
snapshotId, Long startSnapshotId
this.splitSize = splitSize;
this.splitLookback = splitLookback;
this.splitOpenFileCost = splitOpenFileCost;
+ this.isStreaming = isStreaming;
+ this.monitorInterval = monitorInterval;
+
this.nameMapping = nameMapping;
this.projectedSchema = projectedSchema;
this.filterExpressions = filterExpressions;
}
- ScanContext fromProperties(Map<String, String> properties) {
- Configuration config = new Configuration();
- properties.forEach(config::setString);
- return new ScanContext(config.get(CASE_SENSITIVE),
config.get(SNAPSHOT_ID), config.get(START_SNAPSHOT_ID),
- config.get(END_SNAPSHOT_ID), config.get(AS_OF_TIMESTAMP),
config.get(SPLIT_SIZE), config.get(SPLIT_LOOKBACK),
- config.get(SPLIT_FILE_OPEN_COST),
properties.get(DEFAULT_NAME_MAPPING), projectedSchema, filterExpressions);
- }
-
boolean caseSensitive() {
return caseSensitive;
}
- ScanContext setCaseSensitive(boolean isCaseSensitive) {
- return new ScanContext(isCaseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions);
- }
-
Long snapshotId() {
return snapshotId;
}
- ScanContext useSnapshotId(Long scanSnapshotId) {
- return new ScanContext(caseSensitive, scanSnapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions);
- }
-
Long startSnapshotId() {
return startSnapshotId;
}
- ScanContext startSnapshotId(Long id) {
- return new ScanContext(caseSensitive, snapshotId, id, endSnapshotId,
asOfTimestamp, splitSize, splitLookback,
- splitOpenFileCost, nameMapping, projectedSchema, filterExpressions);
- }
-
Long endSnapshotId() {
return endSnapshotId;
}
- ScanContext endSnapshotId(Long id) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, id,
asOfTimestamp, splitSize, splitLookback,
- splitOpenFileCost, nameMapping, projectedSchema, filterExpressions);
- }
-
Long asOfTimestamp() {
return asOfTimestamp;
}
- ScanContext asOfTimestamp(Long timestamp) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, timestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions);
- }
-
Long splitSize() {
return splitSize;
}
- ScanContext splitSize(Long size) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, size,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions);
- }
-
Integer splitLookback() {
return splitLookback;
}
- ScanContext splitLookback(Integer lookback) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- lookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions);
- }
-
Long splitOpenFileCost() {
return splitOpenFileCost;
}
- ScanContext splitOpenFileCost(Long fileCost) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, fileCost, nameMapping, projectedSchema,
filterExpressions);
+ boolean isStreaming() {
+ return isStreaming;
}
- String nameMapping() {
- return nameMapping;
+ Duration monitorInterval() {
+ return monitorInterval;
}
- ScanContext nameMapping(String mapping) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, mapping, projectedSchema,
filterExpressions);
+ String nameMapping() {
+ return nameMapping;
}
Schema projectedSchema() {
return projectedSchema;
}
- ScanContext project(Schema schema) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, schema,
filterExpressions);
- }
-
List<Expression> filterExpressions() {
return filterExpressions;
}
- ScanContext filterRows(List<Expression> filters) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filters);
+ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long
newEndSnapshotId) {
+ return ScanContext.builder()
+ .caseSensitive(caseSensitive)
+ .useSnapshotId(null)
+ .startSnapshotId(newStartSnapshotId)
+ .endSnapshotId(newEndSnapshotId)
+ .asOfTimestamp(null)
+ .splitSize(splitSize)
+ .splitOpenFileCost(splitOpenFileCost)
+ .nameMapping(nameMapping)
+ .streaming(isStreaming)
+ .monitorInterval(monitorInterval)
+ .build();
+ }
+
+ ScanContext copyWithSnapshotId(long newSnapshotId) {
+ return ScanContext.builder()
+ .caseSensitive(caseSensitive)
+ .useSnapshotId(newSnapshotId)
+ .startSnapshotId(null)
+ .endSnapshotId(null)
+ .asOfTimestamp(null)
+ .splitSize(splitSize)
+ .splitOpenFileCost(splitOpenFileCost)
+ .nameMapping(nameMapping)
+ .streaming(isStreaming)
+ .monitorInterval(monitorInterval)
+ .build();
+ }
+
+ static Builder builder() {
+ return new Builder();
+ }
+
+ static class Builder {
+ private boolean caseSensitive;
+ private Long snapshotId;
+ private Long startSnapshotId;
+ private Long endSnapshotId;
+ private Long asOfTimestamp;
+ private Long splitSize;
+ private Integer splitLookback;
+ private Long splitOpenFileCost;
+ private boolean isStreaming;
+ private Duration monitorInterval;
+ private String nameMapping;
+ private Schema projectedSchema;
+ private List<Expression> filterExpressions;
Review comment:
Shouldn't a builder supply the defaults for these? Otherwise, all of the
values need to be supplied. Why not use `CASE_SENSITIVE.defaultValue()` and
similar here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]