ahmedabu98 commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r1981684696
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
* </tr>
* </table>
*
- * <p><b>Additional configuration options are provided in the `Pre-filtering
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
*
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code triggering_frequency_seconds} </td>
+ * <td> {@code int} </td>
+ * <td>Required for streaming writes. Roughly every
+ * {@code triggering_frequency_seconds} duration, the sink will write
records to data files and produce a table snapshot.
+ * Generally, a higher value will produce fewer, larger data files.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields
to drop before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields
to keep, dropping the rest before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field
that should be the only thing written to table(s).</td>
+ * </tr>
+ * </table>
*
- * <p>If an Iceberg table does not exist at the time of writing, this
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
*
- * <p>Note that this is a best-effort operation that depends on the {@link
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from this snapshot ID (inclusive).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to this snapshot ID (inclusive). If unset and the source
is bounded, it will read
+ * up to the current snapshot (inclusive). If unset and source is
unbounded, it will continue polling for new snapshots forever.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from the earliest snapshot (inclusive) created
after this timestamp (in milliseconds).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to the latest snapshot (inclusive) created before this
timestamp (in milliseconds).
Review Comment:
Yes, all the new configuration parameters are optional
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
* </tr>
* </table>
*
- * <p><b>Additional configuration options are provided in the `Pre-filtering
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
*
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code triggering_frequency_seconds} </td>
+ * <td> {@code int} </td>
+ * <td>Required for streaming writes. Roughly every
+ * {@code triggering_frequency_seconds} duration, the sink will write
records to data files and produce a table snapshot.
+ * Generally, a higher value will produce fewer, larger data files.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields
to drop before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields
to keep, dropping the rest before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field
that should be the only thing written to table(s).</td>
+ * </tr>
+ * </table>
*
- * <p>If an Iceberg table does not exist at the time of writing, this
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
*
- * <p>Note that this is a best-effort operation that depends on the {@link
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from this snapshot ID (inclusive).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to this snapshot ID (inclusive). If unset and the source
is bounded, it will read
+ * up to the current snapshot (inclusive). If unset and source is
unbounded, it will continue polling for new snapshots forever.
Review Comment:
Yep snapshot is a core concept in Iceberg. Will use "latest snapshot"
instead for clarity
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -103,22 +108,119 @@
* </tr>
* </table>
*
- * <p><b>Additional configuration options are provided in the `Pre-filtering
Options` section below,
- * for Iceberg writes.</b>
+ * <h3>Sink-only Options</h3>
*
- * <h3>Creating Tables</h3>
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code triggering_frequency_seconds} </td>
+ * <td> {@code int} </td>
+ * <td>Required for streaming writes. Roughly every
+ * {@code triggering_frequency_seconds} duration, the sink will write
records to data files and produce a table snapshot.
+ * Generally, a higher value will produce fewer, larger data files.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td>{@code drop}</td> <td>{@code list<str>}</td> <td>A list of fields
to drop before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code keep}</td> <td>{@code list<str>}</td> <td>A list of fields
to keep, dropping the rest before writing to table(s).</td>
+ * </tr>
+ * <tr>
+ * <td>{@code only}</td> <td>{@code str}</td> <td>A nested record field
that should be the only thing written to table(s).</td>
+ * </tr>
+ * </table>
*
- * <p>If an Iceberg table does not exist at the time of writing, this
connector will automatically
- * create one with the data's schema.
+ * <h3>Source-only Options</h3>
*
- * <p>Note that this is a best-effort operation that depends on the {@link
Catalog} implementation.
- * Some implementations may not support creating a table using the Iceberg API.
+ * <h4>CDC Source options</h4>
+ *
+ * <table border="1" cellspacing="1">
+ * <tr>
+ * <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td>
<b>Description</b> </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from this snapshot ID (inclusive).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_snapshot} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to this snapshot ID (inclusive). If unset and the source
is bounded, it will read
+ * up to the current snapshot (inclusive). If unset and source is
unbounded, it will continue polling for new snapshots forever.
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code from_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Starts reading from the earliest snapshot (inclusive) created
after this timestamp (in milliseconds).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code to_timestamp} </td>
+ * <td> {@code long} </td>
+ * <td> Reads up to the latest snapshot (inclusive) created before this
timestamp (in milliseconds).
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> {@code starting_strategy} </td>
+ * <td> {@code str} </td>
+ * <td>
+ * The source's starting strategy. Valid options are:
+ * <ul>
+ * <li>{@code earliest}: starts reading from the earliest
snapshot</li>
+ * <li>{@code latest}: starts reading from the latest snapshot</li>
+ * </ul>
+ * <p>Defaults to {@code earliest} for batch, and {@code latest} for
streaming.
Review Comment:
Streaming in the context of IcebergIO, so the config. I'll move up the
`streaming` row so ppl will see it first and reference it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]