ahmedabu98 commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r1974285828


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -413,29 +438,56 @@ abstract static class Builder {
 
       abstract Builder setTableIdentifier(TableIdentifier identifier);
 
+      abstract Builder setFromSnapshotExclusive(@Nullable Long 
fromSnapshotExclusive);
+
+      abstract Builder setToSnapshot(@Nullable Long toSnapshot);
+
+      abstract Builder setTriggeringFrequency(Duration triggeringFrequency);
+
       abstract ReadRows build();
     }
 
     public ReadRows from(TableIdentifier tableIdentifier) {
       return toBuilder().setTableIdentifier(tableIdentifier).build();
     }
 
+    public ReadRows fromSnapshotExclusive(@Nullable Long 
fromSnapshotExclusive) {
+      return 
toBuilder().setFromSnapshotExclusive(fromSnapshotExclusive).build();
+    }
+
+    public ReadRows toSnapshot(@Nullable Long toSnapshot) {
+      return toBuilder().setToSnapshot(toSnapshot).build();
+    }
+
+    public ReadRows withTriggeringFrequency(Duration triggeringFrequency) {
+      return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
+    }
+
     @Override
     public PCollection<Row> expand(PBegin input) {
       TableIdentifier tableId =
           checkStateNotNull(getTableIdentifier(), "Must set a table to read 
from.");
 
       Table table = getCatalogConfig().catalog().loadTable(tableId);
 
-      return input.apply(
-          Read.from(
-              new ScanSource(
-                  IcebergScanConfig.builder()
-                      .setCatalogConfig(getCatalogConfig())
-                      .setScanType(IcebergScanConfig.ScanType.TABLE)
-                      .setTableIdentifier(tableId)
-                      
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
-                      .build())));
+      IcebergScanConfig scanConfig =
+          IcebergScanConfig.builder()
+              .setCatalogConfig(getCatalogConfig())
+              .setScanType(IcebergScanConfig.ScanType.TABLE)
+              .setTableIdentifier(tableId)
+              
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
+              .setFromSnapshotExclusive(getFromSnapshotExclusive())
+              .setToSnapshot(getToSnapshot())
+              .build();
+      if (getTriggeringFrequency() != null

Review Comment:
   After offline discussions, we decided to make this a new Managed connector 
(`Managed.ICEBERG_CDC`) that is streaming only. I'll keep the 
`.streaming(true)` option for IcebergIO itself, but it won't be raised up as a 
Managed configuration option



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to