ahmedabu98 commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r1981684696


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
  *   </tr>
  * </table>
  *
- * <p><b>Additional configuration options are provided in the `Pre-filtering 
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
  *
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ *   <tr>
+ *     <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> 
<b>Description</b> </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code triggering_frequency_seconds} </td>
+ *     <td> {@code int} </td>
+ *     <td>Required for streaming writes. Roughly every
+ *       {@code triggering_frequency_seconds} duration, the sink will write 
records to data files and produce a table snapshot.
+ *       Generally, a higher value will produce fewer, larger data files.
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *       <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields 
to drop before writing to table(s).</td>
+ *   </tr>
+ *   <tr>
+ *       <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields 
to keep, dropping the rest before writing to table(s).</td>
+ *   </tr>
+ *   <tr>
+ *       <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field 
that should be the only thing written to table(s).</td>
+ *   </tr>
+ * </table>
  *
- * <p>If an Iceberg table does not exist at the time of writing, this 
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
  *
- * <p>Note that this is a best-effort operation that depends on the {@link 
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ *   <tr>
+ *     <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> 
<b>Description</b> </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code from_snapshot} </td>
+ *     <td> {@code long} </td>
+ *     <td> Starts reading from this snapshot ID (inclusive).
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code to_snapshot} </td>
+ *     <td> {@code long} </td>
+ *     <td> Reads up to this snapshot ID (inclusive). If unset and the source 
is bounded, it will read
+ *     up to the current snapshot (inclusive). If unset and source is 
unbounded, it will continue polling for new snapshots forever.
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code from_timestamp} </td>
+ *     <td> {@code long} </td>
+ *     <td> Starts reading from the earliest snapshot (inclusive) created 
after this timestamp (in milliseconds).
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code to_timestamp} </td>
+ *     <td> {@code long} </td>
+ *     <td> Reads up to the latest snapshot (inclusive) created before this 
timestamp (in milliseconds).

Review Comment:
   Yes, all the new configuration parameters are optional



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
  *   </tr>
  * </table>
  *
- * <p><b>Additional configuration options are provided in the `Pre-filtering 
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
  *
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ *   <tr>
+ *     <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> 
<b>Description</b> </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code triggering_frequency_seconds} </td>
+ *     <td> {@code int} </td>
+ *     <td>Required for streaming writes. Roughly every
+ *       {@code triggering_frequency_seconds} duration, the sink will write 
records to data files and produce a table snapshot.
+ *       Generally, a higher value will produce fewer, larger data files.
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *       <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields 
to drop before writing to table(s).</td>
+ *   </tr>
+ *   <tr>
+ *       <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields 
to keep, dropping the rest before writing to table(s).</td>
+ *   </tr>
+ *   <tr>
+ *       <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field 
that should be the only thing written to table(s).</td>
+ *   </tr>
+ * </table>
  *
- * <p>If an Iceberg table does not exist at the time of writing, this 
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
  *
- * <p>Note that this is a best-effort operation that depends on the {@link 
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ *   <tr>
+ *     <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> 
<b>Description</b> </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code from_snapshot} </td>
+ *     <td> {@code long} </td>
+ *     <td> Starts reading from this snapshot ID (inclusive).
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code to_snapshot} </td>
+ *     <td> {@code long} </td>
+ *     <td> Reads up to this snapshot ID (inclusive). If unset and the source 
is bounded, it will read
+ *     up to the current snapshot (inclusive). If unset and source is 
unbounded, it will continue polling for new snapshots forever.

Review Comment:
   Yep snapshot is a core concept in Iceberg. Will use "latest snapshot" 
instead for clarity



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
  *   </tr>
  * </table>
  *
- * <p><b>Additional configuration options are provided in the `Pre-filtering 
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
  *
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ *   <tr>
+ *     <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> 
<b>Description</b> </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code triggering_frequency_seconds} </td>
+ *     <td> {@code int} </td>
+ *     <td>Required for streaming writes. Roughly every
+ *       {@code triggering_frequency_seconds} duration, the sink will write 
records to data files and produce a table snapshot.
+ *       Generally, a higher value will produce fewer, larger data files.
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *       <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields 
to drop before writing to table(s).</td>
+ *   </tr>
+ *   <tr>
+ *       <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields 
to keep, dropping the rest before writing to table(s).</td>
+ *   </tr>
+ *   <tr>
+ *       <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field 
that should be the only thing written to table(s).</td>
+ *   </tr>
+ * </table>
  *
- * <p>If an Iceberg table does not exist at the time of writing, this 
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
  *
- * <p>Note that this is a best-effort operation that depends on the {@link 
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ *   <tr>
+ *     <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> 
<b>Description</b> </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code from_snapshot} </td>
+ *     <td> {@code long} </td>
+ *     <td> Starts reading from this snapshot ID (inclusive).
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code to_snapshot} </td>
+ *     <td> {@code long} </td>
+ *     <td> Reads up to this snapshot ID (inclusive). If unset and the source 
is bounded, it will read
+ *     up to the current snapshot (inclusive). If unset and source is 
unbounded, it will continue polling for new snapshots forever.
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code from_timestamp} </td>
+ *     <td> {@code long} </td>
+ *     <td> Starts reading from the earliest snapshot (inclusive) created 
after this timestamp (in milliseconds).
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code to_timestamp} </td>
+ *     <td> {@code long} </td>
+ *     <td> Reads up to the latest snapshot (inclusive) created before this 
timestamp (in milliseconds).
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td> {@code starting_strategy} </td>
+ *     <td> {@code str} </td>
+ *     <td>
+ *       The source's starting strategy. Valid options are:
+ *       <ul>
+ *           <li>{@code earliest}: starts reading from the earliest 
snapshot</li>
+ *           <li>{@code latest}: starts reading from the latest snapshot</li>
+ *       </ul>
+ *       <p>Defaults to {@code earliest} for batch, and {@code latest} for 
streaming.

Review Comment:
   Streaming in the context of IcebergIO, so the config. I'll move up the 
`streaming` row so ppl will see it first and reference it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to