reuvenlax commented on code in PR #26975:
URL: https://github.com/apache/beam/pull/26975#discussion_r1224798288


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -477,6 +476,56 @@
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Upserts and deletes</h3>
+ *
+ * The connector also supports streaming row updates to BigQuery, with the 
following qualifications:
+ * - The CREATE_IF_NEEDED CreateDisposition is not supported. Tables must be 
precreated with primary
+ * keys. - Only the STORAGE_WRITE_API_AT_LEAST_ONCE method is supported.
+ *
+ * <p>Two types of updates are supported. UPSERT replaces the row with the 
matching primary key or
+ * inserts the row if non exists. DELETE removes the row with the matching 
primary key. Row inserts
+ * are still allowed as normal using a separate instance of the sink, however 
care must be taken not

Review Comment:
   Removed sentence



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -477,6 +476,56 @@
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Upserts and deletes</h3>
+ *
+ * The connector also supports streaming row updates to BigQuery, with the 
following qualifications:
+ * - The CREATE_IF_NEEDED CreateDisposition is not supported. Tables must be 
precreated with primary
+ * keys. - Only the STORAGE_WRITE_API_AT_LEAST_ONCE method is supported.
+ *
+ * <p>Two types of updates are supported. UPSERT replaces the row with the 
matching primary key or
+ * inserts the row if non exists. DELETE removes the row with the matching 
primary key. Row inserts
+ * are still allowed as normal using a separate instance of the sink, however 
care must be taken not
+ * to violate primary key uniqueness constraints, as those constraints are not 
enforced by BigQuery.
+ * If a table contains multiple rows with the same primary key, then row 
updates may not work as
+ * expected. In particular, these inserts should _only_ be done using the 
exactly-once sink
+ * (STORAGE_WRITE_API), as the at-least once sink may duplicate inserts, 
violating the constraint.
+ *
+ * <p>Since PCollections are unordered, in order to properly sequence updates 
a sequence number must
+ * be set on each update. BigQuery uses this sequence number to ensure that 
updates are correctly
+ * applied to the table even if they arrive out of order.
+ *
+ * <p>The simplest way to apply row updates if applying {@link TableRow} 
object is to use the {@link
+ * Write#applyRowMutations} method. Each {@link RowMutation} element contains 
a {@link TableRow}, an
+ * update type (UPSERT or DELETE), and a sequence number to order the updates.
+ *
+ * <pre>{@code
+ * PCollection<TableRow> rows = ...;
+ * row.apply(MapElements
+ *       .into(new TypeDescriptor<RowMutation>(){})
+ *       .via(tableRow -> RowMutation.of(tableRow, getUpdateType(tableRow), 
getSequenceNumber(tableRow))))

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to