This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 55638de40cd [BQ Python StorageWriteAPI] Enable writing rows with
missing nullable fields (#28009)
55638de40cd is described below
commit 55638de40cd42bf4812f6e330e6155e7f2814602
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Aug 22 21:10:15 2023 +0000
[BQ Python StorageWriteAPI] Enable writing rows with missing nullable
fields (#28009)
* remove strict validation
* remove unused log
* add transform description for better debug
* refactor bigquery_tools.beam_row_from_dict and fill missing nullable
fields with None
* remove unused
* address comment
---
...ueryStorageWriteApiSchemaTransformProvider.java | 46 ---------------
...StorageWriteApiSchemaTransformProviderTest.java | 67 ----------------------
sdks/python/apache_beam/io/gcp/bigquery.py | 4 +-
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 27 ++++++---
.../apache_beam/io/gcp/bigquery_tools_test.py | 5 ++
5 files changed, 25 insertions(+), 124 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 63aadb5525b..6ee98eb0ddf 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -20,8 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery.providers;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Arrays;
@@ -34,15 +32,12 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
@@ -64,8 +59,6 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* An implementation of {@link TypedSchemaTransformProvider} for BigQuery
Storage Write API jobs
@@ -81,8 +74,6 @@ import org.slf4j.LoggerFactory;
@AutoService(SchemaTransformProvider.class)
public class BigQueryStorageWriteApiSchemaTransformProvider
extends
TypedSchemaTransformProvider<BigQueryStorageWriteApiSchemaTransformConfiguration>
{
- private static final Logger LOG =
-
LoggerFactory.getLogger(BigQueryStorageWriteApiSchemaTransformProvider.class);
private static final Integer DEFAULT_TRIGGER_FREQUENCY_SECS = 5;
private static final Duration DEFAULT_TRIGGERING_FREQUENCY =
Duration.standardSeconds(DEFAULT_TRIGGER_FREQUENCY_SECS);
@@ -300,12 +291,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
Schema inputSchema = inputRows.getSchema();
- // check if input schema is assignable to the output schema with
nullability
- // check disabled for field
- if (write.getTable() != null) {
- TableReference tableRef = write.getTable().get();
- validateSchema(input.getPipeline().getOptions(), inputSchema,
tableRef);
- }
WriteResult result =
inputRows
.apply(
@@ -392,36 +377,5 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
return write;
}
-
- private void validateSchema(
- PipelineOptions pipelineOptions, Schema inputSchema, TableReference
tableRef) {
- LOG.info("Validating schema ...");
- BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);
- try {
- Table table = null;
- if (this.testBigQueryServices != null) {
- DatasetService datasetService =
testBigQueryServices.getDatasetService(options);
- if (datasetService != null) {
- table = datasetService.getTable(tableRef);
- }
- } else {
- table = BigQueryHelpers.getTable(options, tableRef);
- }
- if (table == null) {
- LOG.info("Table [{}] not found, skipping schema validation.",
tableRef.getTableId());
- return;
- }
- Schema outputSchema = BigQueryUtils.fromTableSchema(table.getSchema());
- if (!inputSchema.assignableToIgnoreNullable(outputSchema)) {
- throw new IllegalArgumentException(
- "Input schema is not assignable to output schema. Input schema="
- + inputSchema
- + ", Output schema="
- + outputSchema);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
index bc481deff9e..df085bcedec 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import java.io.Serializable;
import java.time.LocalDateTime;
@@ -31,10 +29,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform;
import
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
@@ -45,8 +41,6 @@ import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -97,11 +91,6 @@ public class
BigQueryStorageWriteApiSchemaTransformProviderTest {
.withFieldValue("dt",
LocalDateTime.parse("2000-01-03T00:00:00.123456"))
.build());
- private static final Schema SCHEMA_WRONG =
- Schema.of(
- Field.of("name_wrong", FieldType.STRING),
- Field.of("number", FieldType.INT64),
- Field.of("dt", FieldType.logicalType(SqlTypes.DATETIME)));
@Rule public final transient TestPipeline p = TestPipeline.create();
@Before
@@ -188,62 +177,6 @@ public class
BigQueryStorageWriteApiSchemaTransformProviderTest {
rowsEquals(ROWS, fakeDatasetService.getAllRows("project", "dataset",
"simple_write")));
}
- @Test
- public void testSchemaValidationSuccess() throws Exception {
- String tableSpec = "project:dataset.schema_validation_success";
- Table table = new Table();
- TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
- table.setTableReference(tableReference);
- table.setSchema(BigQueryUtils.toTableSchema(SCHEMA));
- fakeDatasetService.createTable(table);
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
- .setTable(tableSpec)
- .setCreateDisposition("CREATE_IF_NEEDED")
- .build();
-
- runWithConfig(config);
- p.run().waitUntilFinish();
-
-
assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec)));
- assertEquals(
- 3, fakeDatasetService.getAllRows("project", "dataset",
"schema_validation_success").size());
- }
-
- @Test(expected = RuntimeException.class)
- public void testSchemaValidationFail() throws Exception {
- String tableSpec = "project:dataset.schema_validation_fail";
- Table table = new Table();
- TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
- table.setTableReference(tableReference);
- table.setSchema(BigQueryUtils.toTableSchema(SCHEMA_WRONG));
- fakeDatasetService.createTable(table);
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
- .setTable(tableSpec)
- .setCreateDisposition("CREATE_IF_NEEDED")
- .build();
- BigQueryStorageWriteApiSchemaTransformProvider provider =
- new BigQueryStorageWriteApiSchemaTransformProvider();
-
- BigQueryStorageWriteApiSchemaTransform writeTransform =
- (BigQueryStorageWriteApiSchemaTransform) provider.from(config);
- writeTransform.setBigQueryServices(fakeBigQueryServices);
- List<Row> testRows =
- Arrays.asList(
- Row.withSchema(SCHEMA)
- .withFieldValue("name", "a")
- .withFieldValue("number", 1L)
- .withFieldValue("dt",
LocalDateTime.parse("2000-01-01T00:00:00"))
- .build());
- String tag = provider.inputCollectionNames().get(0);
- PipelineOptions options = PipelineOptionsFactory.create();
- Pipeline pipeline = Pipeline.create(options);
- PCollection<Row> rows =
pipeline.apply(Create.of(testRows).withRowSchema(SCHEMA));
- PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
- writeTransform.expand(input);
- }
-
@Test
public void testInputElementCount() throws Exception {
String tableSpec = "project:dataset.input_count";
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index e3ae0179067..6bdaeeb1db8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2233,12 +2233,12 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
# SchemaTransform expects Beam Rows, so map to Rows first
output_beam_rows = (
pcoll
- |
+ | "Convert dict to Beam Row" >>
beam.Map(lambda row: bigquery_tools.beam_row_from_dict(row, schema)).
with_output_types(
RowTypeConstraint.from_fields(
bigquery_tools.get_beam_typehints_from_tableschema(schema)))
- | StorageWriteToBigQuery(
+ | "StorageWriteToBigQuery" >> StorageWriteToBigQuery(
table=table,
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 6f55879d2a3..07d711f8fc9 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -1558,23 +1558,32 @@ bigquery_v2_messages.TableSchema):
"""
if not isinstance(schema, (bigquery.TableSchema, bigquery.TableFieldSchema)):
schema = get_bq_tableschema(schema)
- schema_fields = {field.name: field for field in schema.fields}
beam_row = {}
- for col_name, value in row.items():
- # get this column's schema field and handle struct types
- field = schema_fields[col_name]
- if field.type.upper() in ["RECORD", "STRUCT"]:
+ for field in schema.fields:
+ name = field.name
+ mode = field.mode.upper()
+ type = field.type.upper()
+ # When writing with Storage Write API via xlang, we give the Beam Row
+ # PCollection a hint on the schema using `with_output_types`.
+ # This requires that each row has all the fields in the schema.
+ # However, it's possible that some nullable fields don't appear in the row.
+ # For this case, we create the field with a `None` value
+ if name not in row and mode == "NULLABLE":
+ row[name] = None
+
+ value = row[name]
+ if type in ["RECORD", "STRUCT"]:
# if this is a list of records, we create a list of Beam Rows
- if field.mode.upper() == "REPEATED":
+ if mode == "REPEATED":
list_of_beam_rows = []
for record in value:
list_of_beam_rows.append(beam_row_from_dict(record, field))
- beam_row[col_name] = list_of_beam_rows
+ beam_row[name] = list_of_beam_rows
# otherwise, create a Beam Row from this record
else:
- beam_row[col_name] = beam_row_from_dict(value, field)
+ beam_row[name] = beam_row_from_dict(value, field)
else:
- beam_row[col_name] = value
+ beam_row[name] = value
return apache_beam.pvalue.Row(**beam_row)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index a3e39d8e18d..b4c84d589c0 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -861,6 +861,11 @@ class TestBeamRowFromDict(unittest.TestCase):
schema = {"fields": self.get_schema_fields_with_mode("nullable")}
dict_row = {k: None for k in self.DICT_ROW}
+ # input dict row with missing nullable fields should still yield a full
+ # Beam Row
+ del dict_row['str']
+ del dict_row['bool']
+
expected_beam_row = beam.Row(
str=None,
bool=None,