slilichenko commented on code in PR #26975:
URL: https://github.com/apache/beam/pull/26975#discussion_r1222122708


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -477,6 +476,56 @@
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Upserts and deletes</h3>
+ *
+ * The connector also supports streaming row updates to BigQuery, with the 
following qualifications:
+ * - The CREATE_IF_NEEDED CreateDisposition is not supported. Tables must be 
precreated with primary
+ * keys. - Only the STORAGE_WRITE_API_AT_LEAST_ONCE method is supported.
+ *
+ * <p>Two types of updates are supported. UPSERT replaces the row with the 
matching primary key or
+ * inserts the row if non exists. DELETE removes the row with the matching 
primary key. Row inserts
+ * are still allowed as normal using a separate instance of the sink, however 
care must be taken not
+ * to violate primary key uniqueness constraints, as those constraints are not 
enforced by BigQuery.
+ * If a table contains multiple rows with the same primary key, then row 
updates may not work as
+ * expected. In particular, these inserts should _only_ be done using the 
exactly-once sink
+ * (STORAGE_WRITE_API), as the at-least once sink may duplicate inserts, 
violating the constraint.
+ *
+ * <p>Since PCollections are unordered, in order to properly sequence updates 
a sequence number must
+ * be set on each update. BigQuery uses this sequence number to ensure that 
updates are correctly
+ * applied to the table even if they arrive out of order.
+ *
+ * <p>The simplest way to apply row updates if applying {@link TableRow} 
object is to use the {@link
+ * Write#applyRowMutations} method. Each {@link RowMutation} element contains 
a {@link TableRow}, an
+ * update type (UPSERT or DELETE), and a sequence number to order the updates.
+ *
+ * <pre>{@code
+ * PCollection<TableRow> rows = ...;
+ * row.apply(MapElements
+ *       .into(new TypeDescriptor<RowMutation>(){})
+ *       .via(tableRow -> RowMutation.of(tableRow, getUpdateType(tableRow), 
getSequenceNumber(tableRow))))
+ *    .apply(BigQueryIO.applyRowMutations()
+ *           .to(my_project:my_dataset.my_table)
+ *           .withSchema(schema)
+ *           .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
+ *           .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER));
+ * }</pre>
+ *
+ * <p>If writing a type other than TableRow (e.g. using {@link 
BigQueryIO#writeGenericRecords} or
+ * writing a custom user type), then the {@link Write#withRowMutationFn} 
method can be used to set
+ * an update type and sequence number for each record. For example:
+ *
+ * <pre>{@code
+ * PCollection<CdcEvent> cdcEvent = ...;
+ *
+ * cdcEvent.apply(BigQueryIO.write()
+ *          .to("my-project:my_dataset.my_table")
+ *          .withSchema(schema)
+ *          .withFormatFunction(CdcEvent::getTableRow)
+ *          .withRowMutationFn(cdc -> 
RowMutationInformation.of(cdc.getChangeType(), cdc.getSequenceNumber()))

Review Comment:
   Naming is confusing. withRowMutationFn() implies that it will return 
RowMutation, not RowMutationInformation. Either make it return RowMutation in a 
single step, or rename to withRowMutationInformationFn.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java:
##########
@@ -92,8 +93,26 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
   public void close() throws Exception {}
 
   static class Stream {
+    static class Entry {
+      enum UpdateType {
+        INSERT,

Review Comment:
   Is INSERT still needed?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java:
##########
@@ -586,11 +639,35 @@ public ApiFuture<AppendRowsResponse> appendRows(long 
offset, ProtoRows rows)
             }
             TableRow tableRow =
                 TableRowToStorageApiProto.tableRowFromMessage(
-                    DynamicMessage.parseFrom(protoDescriptor, bytes));
+                    DynamicMessage.parseFrom(protoDescriptor, bytes), false);
             if (shouldFailRow.apply(tableRow)) {
               rowIndexToErrorMessage.put(i, "Failing row " + 
tableRow.toPrettyString());
             }
-            tableRows.add(tableRow);
+            String insertTypeStr = null;
+            long csn = -1;
+            Descriptors.FieldDescriptor fieldDescriptor =
+                protoDescriptor.findFieldByName("_CHANGE_TYPE");
+            if (fieldDescriptor != null) {
+              insertTypeStr = (String) msg.getField(fieldDescriptor);
+            }
+            fieldDescriptor = 
protoDescriptor.findFieldByName("_CHANGE_SEQUENCE_NUMBER");
+            if (fieldDescriptor != null) {
+              csn = (long) msg.getField(fieldDescriptor);
+            }
+            Stream.Entry.UpdateType insertType = 
Stream.Entry.UpdateType.INSERT;

Review Comment:
   Remove references to INSERT



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * A convenience class for applying row updates to BigQuery using {@link
+ * BigQueryIO.Write#applyRowMutations}. This class encapsulates a {@link 
TableRow} payload along
+ * with information how to update the row. A sequence number must also be 
supplied to order the
+ * updates. Incorrect sequence numbers will result in unexpected state in the 
BigQuery table.
+ */
+@AutoValue
+public abstract class RowMutation {
+  public abstract TableRow getTableRow();
+
+  public abstract RowMutationInformation getMutationInformation();
+
+  public static RowMutation of(TableRow tableRow, RowMutationInformation 
rowMutationInformation) {
+    return new AutoValue_RowMutation(tableRow, rowMutationInformation);
+  }
+
+  public static class RowMutationCoder extends AtomicCoder<RowMutation> {
+    private static final RowMutationCoder INSTANCE = new RowMutationCoder();
+
+    public static RowMutationCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(RowMutation value, OutputStream outStream) throws 
IOException {
+      TableRowJsonCoder.of().encode(value.getTableRow(), outStream);
+      VarIntCoder.of()
+          .encode(value.getMutationInformation().getMutationType().ordinal(), 
outStream);
+      
VarLongCoder.of().encode(value.getMutationInformation().getSequenceNumber(), 
outStream);
+    }
+
+    @Override
+    public RowMutation decode(InputStream inStream) throws IOException {
+      return RowMutation.of(
+          TableRowJsonCoder.of().decode(inStream),
+          RowMutationInformation.of(

Review Comment:
   Not a bug, but this relies on parameter ordering. If the parameters get 
reversed, the coder would break. Perhaps, explicitly decode each part before 
constructing the Info object. 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java:
##########
@@ -160,7 +177,20 @@ void flush(long position) {
         throw new RuntimeException("");
       }
       for (; nextFlushPosition <= position; ++nextFlushPosition) {
-        tableContainer.addRow(stream.get((int) nextFlushPosition), "");
+        applyEntry(stream.get((int) nextFlushPosition));
+      }
+    }
+
+    void applyEntry(Entry entry) {
+      switch (entry.updateType) {
+        case INSERT:

Review Comment:
   Remove?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -477,6 +476,56 @@
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Upserts and deletes</h3>
+ *
+ * The connector also supports streaming row updates to BigQuery, with the 
following qualifications:
+ * - The CREATE_IF_NEEDED CreateDisposition is not supported. Tables must be 
precreated with primary
+ * keys. - Only the STORAGE_WRITE_API_AT_LEAST_ONCE method is supported.

Review Comment:
   can applyRowUpdates() explicitly set the method; making it less verbose?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -75,6 +77,11 @@
  * with the Storage write API.
  */
 public class TableRowToStorageApiProto {
+  private static final String CDC_CHANGE_SQN_COLUMN = 
"_CHANGE_SEQUENCE_NUMBER";
+  private static final String CDC_CHANGE_TYPE_COLUMN = "_CHANGE_TYPE";
+  private static final Set<String> CDC_COLUMNS =

Review Comment:
   This seems to be the third time these constants are defined in different 
places. Perhaps consolidate.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java:
##########
@@ -586,11 +639,35 @@ public ApiFuture<AppendRowsResponse> appendRows(long 
offset, ProtoRows rows)
             }
             TableRow tableRow =
                 TableRowToStorageApiProto.tableRowFromMessage(
-                    DynamicMessage.parseFrom(protoDescriptor, bytes));
+                    DynamicMessage.parseFrom(protoDescriptor, bytes), false);
             if (shouldFailRow.apply(tableRow)) {
               rowIndexToErrorMessage.put(i, "Failing row " + 
tableRow.toPrettyString());
             }
-            tableRows.add(tableRow);
+            String insertTypeStr = null;
+            long csn = -1;
+            Descriptors.FieldDescriptor fieldDescriptor =
+                protoDescriptor.findFieldByName("_CHANGE_TYPE");

Review Comment:
   Change to refer to constants?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java:
##########
@@ -160,7 +177,20 @@ void flush(long position) {
         throw new RuntimeException("");
       }
       for (; nextFlushPosition <= position; ++nextFlushPosition) {
-        tableContainer.addRow(stream.get((int) nextFlushPosition), "");
+        applyEntry(stream.get((int) nextFlushPosition));
+      }
+    }
+
+    void applyEntry(Entry entry) {
+      switch (entry.updateType) {
+        case INSERT:
+          tableContainer.addRow(entry.tableRow, "");
+          break;
+        case UPSERT:
+          tableContainer.upsertRow(entry.tableRow, entry.sqn);
+          break;
+        case DELETE:
+          tableContainer.deleteRow(entry.tableRow, entry.sqn);
       }

Review Comment:
   break; for consistency + default: throw RuntimeException()?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java:
##########
@@ -446,9 +449,32 @@ private void assertBaseRecord(DynamicMessage msg, 
Map<String, Object> expectedFi
   public void testMessageFromGenericRecord() throws Exception {
     Descriptors.Descriptor descriptor =
         TableRowToStorageApiProto.getDescriptorFromTableSchema(
-            
AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA),
 true);
+            
AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA),
+            true,
+            false);
     DynamicMessage msg =
-        
AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, 
nestedRecord);
+        AvroGenericRecordToStorageApiProto.messageFromGenericRecord(
+            descriptor, nestedRecord, null, -1);
+
+    assertEquals(2, msg.getAllFields().size());
+
+    Map<String, Descriptors.FieldDescriptor> fieldDescriptors =
+        descriptor.getFields().stream()
+            .collect(Collectors.toMap(Descriptors.FieldDescriptor::getName, 
Functions.identity()));
+    DynamicMessage nestedMsg = (DynamicMessage) 
msg.getField(fieldDescriptors.get("nested"));
+    assertBaseRecord(nestedMsg, baseProtoExpectedFields);
+  }
+
+  @Test
+  public void testCdcFields() throws Exception {
+    Descriptors.Descriptor descriptor =
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(
+            
AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(NESTED_SCHEMA),
+            true,
+            false);
+    DynamicMessage msg =
+        AvroGenericRecordToStorageApiProto.messageFromGenericRecord(
+            descriptor, nestedRecord, null, -1);

Review Comment:
   Doesn't look like it's actually testing CDC fields - changeType is null.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/TableContainer.java:
##########
@@ -53,12 +113,64 @@ long addRow(TableRow row, String id) {
     }
   }
 
+  void upsertRow(TableRow row, long sequenceNumber) {
+    List<Object> primaryKey = getPrimaryKey(row);
+    if (primaryKey == null) {
+      throw new RuntimeException("Upserts only allowed when using primary 
keys");
+    }
+    long lastSequenceNumberForKey = 
lastSequenceNumber.getOrDefault(primaryKey, -1L);

Review Comment:
   Makes an assumption that sequence numbers will be positive, which is not 
necessarily the case. Recommend changing to negative infinitiy.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java:
##########
@@ -1186,4 +1189,37 @@ public void testIgnoreUnknownNestedField() throws 
Exception {
         ((TableRow) 
unknown.get("nestedvalue1")).getF().get(BASE_TABLE_ROW.getF().size()).getV());
     assertEquals("foobar", ((TableRow) 
unknown.get("nestedvaluenof1")).get("unknown"));
   }
+
+  @Test
+  public void testCdcFields() throws Exception {
+    TableRow tableRow =
+        new TableRow()
+            .set("nestedValue1", BASE_TABLE_ROW)
+            .set("nestedValue2", BASE_TABLE_ROW)
+            .set("nestedValueNoF1", BASE_TABLE_ROW_NO_F)
+            .set("nestedValueNoF2", BASE_TABLE_ROW_NO_F);
+
+    Descriptor descriptor =
+        
TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA, 
true, true);
+    assertNotNull(descriptor.findFieldByName("_CHANGE_TYPE"));

Review Comment:
   Use constants?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java:
##########
@@ -392,6 +393,26 @@ public void testMessageFromTableRow() throws Exception {
     assertBaseRecord(nestedMsg);
   }
 
+  @Test
+  public void testCdcFields() throws Exception {
+    Descriptor descriptor =
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(
+            
BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(NESTED_SCHEMA), true, 
true);
+    assertNotNull(descriptor.findFieldByName("_CHANGE_TYPE"));
+    assertNotNull(descriptor.findFieldByName("_CHANGE_SEQUENCE_NUMBER"));
+    DynamicMessage msg =
+        BeamRowToStorageApiProto.messageFromBeamRow(descriptor, NESTED_ROW, 
"UPDATE", 42);
+    assertEquals(5, msg.getAllFields().size());
+
+    Map<String, FieldDescriptor> fieldDescriptors =
+        descriptor.getFields().stream()
+            .collect(Collectors.toMap(FieldDescriptor::getName, 
Functions.identity()));
+    DynamicMessage nestedMsg = (DynamicMessage) 
msg.getField(fieldDescriptors.get("nested"));
+    assertBaseRecord(nestedMsg);
+    assertEquals("UPDATE", 
msg.getField(descriptor.findFieldByName("_CHANGE_TYPE")));

Review Comment:
   Use constants?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to