ahmedabu98 commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r1974285828
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -413,29 +438,56 @@ abstract static class Builder {
abstract Builder setTableIdentifier(TableIdentifier identifier);
+ abstract Builder setFromSnapshotExclusive(@Nullable Long
fromSnapshotExclusive);
+
+ abstract Builder setToSnapshot(@Nullable Long toSnapshot);
+
+ abstract Builder setTriggeringFrequency(Duration triggeringFrequency);
+
abstract ReadRows build();
}
public ReadRows from(TableIdentifier tableIdentifier) {
return toBuilder().setTableIdentifier(tableIdentifier).build();
}
+ public ReadRows fromSnapshotExclusive(@Nullable Long
fromSnapshotExclusive) {
+ return
toBuilder().setFromSnapshotExclusive(fromSnapshotExclusive).build();
+ }
+
+ public ReadRows toSnapshot(@Nullable Long toSnapshot) {
+ return toBuilder().setToSnapshot(toSnapshot).build();
+ }
+
+ public ReadRows withTriggeringFrequency(Duration triggeringFrequency) {
+ return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
+ }
+
@Override
public PCollection<Row> expand(PBegin input) {
TableIdentifier tableId =
checkStateNotNull(getTableIdentifier(), "Must set a table to read
from.");
Table table = getCatalogConfig().catalog().loadTable(tableId);
- return input.apply(
- Read.from(
- new ScanSource(
- IcebergScanConfig.builder()
- .setCatalogConfig(getCatalogConfig())
- .setScanType(IcebergScanConfig.ScanType.TABLE)
- .setTableIdentifier(tableId)
-
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
- .build())));
+ IcebergScanConfig scanConfig =
+ IcebergScanConfig.builder()
+ .setCatalogConfig(getCatalogConfig())
+ .setScanType(IcebergScanConfig.ScanType.TABLE)
+ .setTableIdentifier(tableId)
+
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
+ .setFromSnapshotExclusive(getFromSnapshotExclusive())
+ .setToSnapshot(getToSnapshot())
+ .build();
+ if (getTriggeringFrequency() != null
Review Comment:
After offline discussions, we decided to make this a new Managed connector
(`Managed.ICEBERG_CDC`) that is streaming only. I'll keep the
`.streaming(true)` option for IcebergIO itself, but it won't be raised up as a
Managed configuration option
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]