openinx commented on a change in pull request #1793:
URL: https://github.com/apache/iceberg/pull/1793#discussion_r535886725
##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
##########
@@ -98,115 +112,203 @@ private ScanContext(boolean caseSensitive, Long
snapshotId, Long startSnapshotId
this.splitSize = splitSize;
this.splitLookback = splitLookback;
this.splitOpenFileCost = splitOpenFileCost;
+ this.isStreaming = isStreaming;
+ this.monitorInterval = monitorInterval;
+
this.nameMapping = nameMapping;
this.projectedSchema = projectedSchema;
this.filterExpressions = filterExpressions;
}
- ScanContext fromProperties(Map<String, String> properties) {
- Configuration config = new Configuration();
- properties.forEach(config::setString);
- return new ScanContext(config.get(CASE_SENSITIVE),
config.get(SNAPSHOT_ID), config.get(START_SNAPSHOT_ID),
- config.get(END_SNAPSHOT_ID), config.get(AS_OF_TIMESTAMP),
config.get(SPLIT_SIZE), config.get(SPLIT_LOOKBACK),
- config.get(SPLIT_FILE_OPEN_COST),
properties.get(DEFAULT_NAME_MAPPING), projectedSchema, filterExpressions);
- }
-
boolean caseSensitive() {
return caseSensitive;
}
- ScanContext setCaseSensitive(boolean isCaseSensitive) {
- return new ScanContext(isCaseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions);
- }
-
Long snapshotId() {
return snapshotId;
}
- ScanContext useSnapshotId(Long scanSnapshotId) {
- return new ScanContext(caseSensitive, scanSnapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions);
- }
-
Long startSnapshotId() {
return startSnapshotId;
}
- ScanContext startSnapshotId(Long id) {
- return new ScanContext(caseSensitive, snapshotId, id, endSnapshotId,
asOfTimestamp, splitSize, splitLookback,
- splitOpenFileCost, nameMapping, projectedSchema, filterExpressions);
- }
-
Long endSnapshotId() {
return endSnapshotId;
}
- ScanContext endSnapshotId(Long id) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, id,
asOfTimestamp, splitSize, splitLookback,
- splitOpenFileCost, nameMapping, projectedSchema, filterExpressions);
- }
-
Long asOfTimestamp() {
return asOfTimestamp;
}
- ScanContext asOfTimestamp(Long timestamp) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, timestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions);
- }
-
Long splitSize() {
return splitSize;
}
- ScanContext splitSize(Long size) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, size,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions);
- }
-
Integer splitLookback() {
return splitLookback;
}
- ScanContext splitLookback(Integer lookback) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- lookback, splitOpenFileCost, nameMapping, projectedSchema,
filterExpressions);
- }
-
Long splitOpenFileCost() {
return splitOpenFileCost;
}
- ScanContext splitOpenFileCost(Long fileCost) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, fileCost, nameMapping, projectedSchema,
filterExpressions);
+ boolean isStreaming() {
+ return isStreaming;
}
- String nameMapping() {
- return nameMapping;
+ Duration monitorInterval() {
+ return monitorInterval;
}
- ScanContext nameMapping(String mapping) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, mapping, projectedSchema,
filterExpressions);
+ String nameMapping() {
+ return nameMapping;
}
Schema projectedSchema() {
return projectedSchema;
}
- ScanContext project(Schema schema) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, schema,
filterExpressions);
- }
-
List<Expression> filterExpressions() {
return filterExpressions;
}
- ScanContext filterRows(List<Expression> filters) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema,
filters);
+ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long
newEndSnapshotId) {
+ return ScanContext.builder()
+ .caseSensitive(caseSensitive)
+ .useSnapshotId(null)
+ .startSnapshotId(newStartSnapshotId)
+ .endSnapshotId(newEndSnapshotId)
+ .asOfTimestamp(null)
+ .splitSize(splitSize)
+ .splitOpenFileCost(splitOpenFileCost)
+ .nameMapping(nameMapping)
+ .streaming(isStreaming)
+ .monitorInterval(monitorInterval)
+ .build();
+ }
+
+ ScanContext copyWithSnapshotId(long newSnapshotId) {
+ return ScanContext.builder()
+ .caseSensitive(caseSensitive)
+ .useSnapshotId(newSnapshotId)
+ .startSnapshotId(null)
+ .endSnapshotId(null)
+ .asOfTimestamp(null)
+ .splitSize(splitSize)
+ .splitOpenFileCost(splitOpenFileCost)
+ .nameMapping(nameMapping)
+ .streaming(isStreaming)
+ .monitorInterval(monitorInterval)
+ .build();
+ }
+
+ static Builder builder() {
+ return new Builder();
+ }
+
+ static class Builder {
+ private boolean caseSensitive;
+ private Long snapshotId;
+ private Long startSnapshotId;
+ private Long endSnapshotId;
+ private Long asOfTimestamp;
+ private Long splitSize;
+ private Integer splitLookback;
+ private Long splitOpenFileCost;
+ private boolean isStreaming;
+ private Duration monitorInterval;
+ private String nameMapping;
+ private Schema projectedSchema;
+ private List<Expression> filterExpressions;
+
+ private Builder() {
+ }
+
+ Builder caseSensitive(boolean newCaseSensitive) {
+ this.caseSensitive = newCaseSensitive;
+ return this;
+ }
+
+ Builder useSnapshotId(Long newSnapshotId) {
+ this.snapshotId = newSnapshotId;
+ return this;
+ }
+
+ Builder startSnapshotId(Long newStartSnapshotId) {
+ this.startSnapshotId = newStartSnapshotId;
+ return this;
+ }
+
+ Builder endSnapshotId(Long newEndsnapshotId) {
+ this.endSnapshotId = newEndsnapshotId;
+ return this;
+ }
+
+ Builder asOfTimestamp(Long newAsOfTimestamp) {
+ this.asOfTimestamp = newAsOfTimestamp;
+ return this;
+ }
+
+ Builder splitSize(Long newSplitSize) {
+ this.splitSize = newSplitSize;
+ return this;
+ }
+
+ Builder splitLookback(Integer newSplitLookback) {
+ this.splitLookback = newSplitLookback;
+ return this;
+ }
+
+ Builder splitOpenFileCost(Long newSplitOpenFileCost) {
+ this.splitOpenFileCost = newSplitOpenFileCost;
+ return this;
+ }
+
+ Builder streaming(boolean streaming) {
+ this.isStreaming = streaming;
+ return this;
+ }
+
+ Builder monitorInterval(Duration newMonitorInterval) {
+ this.monitorInterval = newMonitorInterval;
+ return this;
+ }
+
+ Builder nameMapping(String newNameMapping) {
+ this.nameMapping = newNameMapping;
+ return this;
+ }
+
+ Builder projectedSchema(Schema newProjectedSchema) {
+ this.projectedSchema = newProjectedSchema;
+ return this;
+ }
+
+ Builder filterExpression(List<Expression> newFilterExpressions) {
+ this.filterExpressions = newFilterExpressions;
+ return this;
+ }
+
+ Builder fromProperties(Map<String, String> properties) {
+ Configuration config = new Configuration();
+ properties.forEach(config::setString);
+
+ return new Builder()
Review comment:
Yeah, you are right ! If use a new builder, then it should be copying
the builder , maybe named it as `mergeProperties`, here we should use `this` .
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]