This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 55638de40cd [BQ Python StorageWriteAPI] Enable writing rows with 
missing nullable fields (#28009)
55638de40cd is described below

commit 55638de40cd42bf4812f6e330e6155e7f2814602
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Aug 22 21:10:15 2023 +0000

    [BQ Python StorageWriteAPI] Enable writing rows with missing nullable 
fields (#28009)
    
    * remove strict validation
    
    * remove unused log
    
    * add transform description for better debug
    
    * refactor bigquery_tools.beam_row_from_dict and fill missing nullable 
fields with None
    
    * remove unused
    
    * address comment
---
 ...ueryStorageWriteApiSchemaTransformProvider.java | 46 ---------------
 ...StorageWriteApiSchemaTransformProviderTest.java | 67 ----------------------
 sdks/python/apache_beam/io/gcp/bigquery.py         |  4 +-
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 27 ++++++---
 .../apache_beam/io/gcp/bigquery_tools_test.py      |  5 ++
 5 files changed, 25 insertions(+), 124 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 63aadb5525b..6ee98eb0ddf 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -20,8 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery.providers;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.util.Arrays;
@@ -34,15 +32,12 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
 import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
@@ -64,8 +59,6 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of {@link TypedSchemaTransformProvider} for BigQuery 
Storage Write API jobs
@@ -81,8 +74,6 @@ import org.slf4j.LoggerFactory;
 @AutoService(SchemaTransformProvider.class)
 public class BigQueryStorageWriteApiSchemaTransformProvider
     extends 
TypedSchemaTransformProvider<BigQueryStorageWriteApiSchemaTransformConfiguration>
 {
-  private static final Logger LOG =
-      
LoggerFactory.getLogger(BigQueryStorageWriteApiSchemaTransformProvider.class);
   private static final Integer DEFAULT_TRIGGER_FREQUENCY_SECS = 5;
   private static final Duration DEFAULT_TRIGGERING_FREQUENCY =
       Duration.standardSeconds(DEFAULT_TRIGGER_FREQUENCY_SECS);
@@ -300,12 +291,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 
       Schema inputSchema = inputRows.getSchema();
 
-      // check if input schema is assignable to the output schema with 
nullability
-      // check disabled for field
-      if (write.getTable() != null) {
-        TableReference tableRef = write.getTable().get();
-        validateSchema(input.getPipeline().getOptions(), inputSchema, 
tableRef);
-      }
       WriteResult result =
           inputRows
               .apply(
@@ -392,36 +377,5 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 
       return write;
     }
-
-    private void validateSchema(
-        PipelineOptions pipelineOptions, Schema inputSchema, TableReference 
tableRef) {
-      LOG.info("Validating schema ...");
-      BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);
-      try {
-        Table table = null;
-        if (this.testBigQueryServices != null) {
-          DatasetService datasetService = 
testBigQueryServices.getDatasetService(options);
-          if (datasetService != null) {
-            table = datasetService.getTable(tableRef);
-          }
-        } else {
-          table = BigQueryHelpers.getTable(options, tableRef);
-        }
-        if (table == null) {
-          LOG.info("Table [{}] not found, skipping schema validation.", 
tableRef.getTableId());
-          return;
-        }
-        Schema outputSchema = BigQueryUtils.fromTableSchema(table.getSchema());
-        if (!inputSchema.assignableToIgnoreNullable(outputSchema)) {
-          throw new IllegalArgumentException(
-              "Input schema is not assignable to output schema. Input schema="
-                  + inputSchema
-                  + ", Output schema="
-                  + outputSchema);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
index bc481deff9e..df085bcedec 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import java.io.Serializable;
 import java.time.LocalDateTime;
@@ -31,10 +29,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.function.Function;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
 import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform;
 import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
 import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
@@ -45,8 +41,6 @@ import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -97,11 +91,6 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
               .withFieldValue("dt", 
LocalDateTime.parse("2000-01-03T00:00:00.123456"))
               .build());
 
-  private static final Schema SCHEMA_WRONG =
-      Schema.of(
-          Field.of("name_wrong", FieldType.STRING),
-          Field.of("number", FieldType.INT64),
-          Field.of("dt", FieldType.logicalType(SqlTypes.DATETIME)));
   @Rule public final transient TestPipeline p = TestPipeline.create();
 
   @Before
@@ -188,62 +177,6 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
         rowsEquals(ROWS, fakeDatasetService.getAllRows("project", "dataset", 
"simple_write")));
   }
 
-  @Test
-  public void testSchemaValidationSuccess() throws Exception {
-    String tableSpec = "project:dataset.schema_validation_success";
-    Table table = new Table();
-    TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
-    table.setTableReference(tableReference);
-    table.setSchema(BigQueryUtils.toTableSchema(SCHEMA));
-    fakeDatasetService.createTable(table);
-    BigQueryStorageWriteApiSchemaTransformConfiguration config =
-        BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
-            .setTable(tableSpec)
-            .setCreateDisposition("CREATE_IF_NEEDED")
-            .build();
-
-    runWithConfig(config);
-    p.run().waitUntilFinish();
-
-    
assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec)));
-    assertEquals(
-        3, fakeDatasetService.getAllRows("project", "dataset", 
"schema_validation_success").size());
-  }
-
-  @Test(expected = RuntimeException.class)
-  public void testSchemaValidationFail() throws Exception {
-    String tableSpec = "project:dataset.schema_validation_fail";
-    Table table = new Table();
-    TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
-    table.setTableReference(tableReference);
-    table.setSchema(BigQueryUtils.toTableSchema(SCHEMA_WRONG));
-    fakeDatasetService.createTable(table);
-    BigQueryStorageWriteApiSchemaTransformConfiguration config =
-        BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
-            .setTable(tableSpec)
-            .setCreateDisposition("CREATE_IF_NEEDED")
-            .build();
-    BigQueryStorageWriteApiSchemaTransformProvider provider =
-        new BigQueryStorageWriteApiSchemaTransformProvider();
-
-    BigQueryStorageWriteApiSchemaTransform writeTransform =
-        (BigQueryStorageWriteApiSchemaTransform) provider.from(config);
-    writeTransform.setBigQueryServices(fakeBigQueryServices);
-    List<Row> testRows =
-        Arrays.asList(
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "a")
-                .withFieldValue("number", 1L)
-                .withFieldValue("dt", 
LocalDateTime.parse("2000-01-01T00:00:00"))
-                .build());
-    String tag = provider.inputCollectionNames().get(0);
-    PipelineOptions options = PipelineOptionsFactory.create();
-    Pipeline pipeline = Pipeline.create(options);
-    PCollection<Row> rows = 
pipeline.apply(Create.of(testRows).withRowSchema(SCHEMA));
-    PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
-    writeTransform.expand(input);
-  }
-
   @Test
   public void testInputElementCount() throws Exception {
     String tableSpec = "project:dataset.input_count";
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index e3ae0179067..6bdaeeb1db8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2233,12 +2233,12 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
       # SchemaTransform expects Beam Rows, so map to Rows first
       output_beam_rows = (
           pcoll
-          |
+          | "Convert dict to Beam Row" >>
           beam.Map(lambda row: bigquery_tools.beam_row_from_dict(row, schema)).
           with_output_types(
               RowTypeConstraint.from_fields(
                   bigquery_tools.get_beam_typehints_from_tableschema(schema)))
-          | StorageWriteToBigQuery(
+          | "StorageWriteToBigQuery" >> StorageWriteToBigQuery(
               table=table,
               create_disposition=self.create_disposition,
               write_disposition=self.write_disposition,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 6f55879d2a3..07d711f8fc9 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -1558,23 +1558,32 @@ bigquery_v2_messages.TableSchema):
   """
   if not isinstance(schema, (bigquery.TableSchema, bigquery.TableFieldSchema)):
     schema = get_bq_tableschema(schema)
-  schema_fields = {field.name: field for field in schema.fields}
   beam_row = {}
-  for col_name, value in row.items():
-    # get this column's schema field and handle struct types
-    field = schema_fields[col_name]
-    if field.type.upper() in ["RECORD", "STRUCT"]:
+  for field in schema.fields:
+    name = field.name
+    mode = field.mode.upper()
+    type = field.type.upper()
+    # When writing with Storage Write API via xlang, we give the Beam Row
+    # PCollection a hint on the schema using `with_output_types`.
+    # This requires that each row has all the fields in the schema.
+    # However, it's possible that some nullable fields don't appear in the row.
+    # For this case, we create the field with a `None` value
+    if name not in row and mode == "NULLABLE":
+      row[name] = None
+
+    value = row[name]
+    if type in ["RECORD", "STRUCT"]:
       # if this is a list of records, we create a list of Beam Rows
-      if field.mode.upper() == "REPEATED":
+      if mode == "REPEATED":
         list_of_beam_rows = []
         for record in value:
           list_of_beam_rows.append(beam_row_from_dict(record, field))
-        beam_row[col_name] = list_of_beam_rows
+        beam_row[name] = list_of_beam_rows
       # otherwise, create a Beam Row from this record
       else:
-        beam_row[col_name] = beam_row_from_dict(value, field)
+        beam_row[name] = beam_row_from_dict(value, field)
     else:
-      beam_row[col_name] = value
+      beam_row[name] = value
   return apache_beam.pvalue.Row(**beam_row)
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index a3e39d8e18d..b4c84d589c0 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -861,6 +861,11 @@ class TestBeamRowFromDict(unittest.TestCase):
     schema = {"fields": self.get_schema_fields_with_mode("nullable")}
     dict_row = {k: None for k in self.DICT_ROW}
 
+    # input dict row with missing nullable fields should still yield a full
+    # Beam Row
+    del dict_row['str']
+    del dict_row['bool']
+
     expected_beam_row = beam.Row(
         str=None,
         bool=None,

Reply via email to