This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 77ea3ab for existing tables, no need to set a schema
new 8b81ab6 Merge pull request #15929 from [BEAM-2791] For existing
BigQuery tables, no need to set a schema.
77ea3ab is described below
commit 77ea3abc98fc0098fd1017502e5ce0de7e868dfe
Author: Reuven Lax <[email protected]>
AuthorDate: Tue Nov 9 19:01:02 2021 -0800
for existing tables, no need to set a schema
---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 9 ++--
.../StorageApiDynamicDestinationsTableRow.java | 49 ++++++++++++++++++----
.../beam/sdk/io/gcp/testing/FakeJobService.java | 6 ++-
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 31 ++++++++++----
4 files changed, 73 insertions(+), 22 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 31cc6fd..7f52a13 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2867,13 +2867,14 @@ public class BigQueryIO {
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method ==
Method.STORAGE_API_AT_LEAST_ONCE) {
+ BigQueryOptions bqOptions =
input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiDynamicDestinations<T, DestinationT>
storageApiDynamicDestinations;
if (getUseBeamSchema()) {
// This ensures that the Beam rows are directly translated into
protos for Sorage API
// writes, with no
// need to round trip through JSON TableRow objects.
storageApiDynamicDestinations =
- new StorageApiDynamicDestinationsBeamRow<T, DestinationT>(
+ new StorageApiDynamicDestinationsBeamRow<>(
dynamicDestinations, elementSchema, elementToRowFunction);
} else {
RowWriterFactory.TableRowWriterFactory<T, DestinationT>
tableRowWriterFactory =
@@ -2881,10 +2882,12 @@ public class BigQueryIO {
// Fallback behavior: convert to JSON TableRows and convert those
into Beam TableRows.
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsTableRow<>(
- dynamicDestinations, tableRowWriterFactory.getToRowFn());
+ dynamicDestinations,
+ tableRowWriterFactory.getToRowFn(),
+ getBigQueryServices().getDatasetService(bqOptions),
+ getCreateDisposition());
}
- BigQueryOptions bqOptions =
input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<DestinationT, T>(
destinationCoder,
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index 0204488..de4817b 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -17,11 +17,16 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Message;
import java.time.Duration;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
@@ -30,31 +35,57 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuild
public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
extends StorageApiDynamicDestinations<T, DestinationT> {
private final SerializableFunction<T, TableRow> formatFunction;
+ private final DatasetService datasetService;
+ private final CreateDisposition createDisposition;
- // TODO: Make static! Or at least optimize the constant schema case.
+ // TODO: Is this cache needed? All callers of getMessageConverter are
already caching the resullt.
private final Cache<DestinationT, Descriptor> destinationDescriptorCache =
CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(15)).build();
StorageApiDynamicDestinationsTableRow(
DynamicDestinations<T, DestinationT> inner,
- SerializableFunction<T, TableRow> formatFunction) {
+ SerializableFunction<T, TableRow> formatFunction,
+ DatasetService datasetService,
+ CreateDisposition createDisposition) {
super(inner);
this.formatFunction = formatFunction;
+ this.datasetService = datasetService;
+ this.createDisposition = createDisposition;
}
@Override
public MessageConverter<T> getMessageConverter(DestinationT destination)
throws Exception {
- final TableSchema tableSchema = getSchema(destination);
- if (tableSchema == null) {
- throw new RuntimeException(
- "Schema must be set when writing TableRows using Storage API. Use "
- + "BigQueryIO.Write.withSchema to set the schema.");
- }
return new MessageConverter<T>() {
Descriptor descriptor =
destinationDescriptorCache.get(
destination,
- () ->
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema));
+ () -> {
+ @Nullable TableSchema tableSchema = getSchema(destination);
+ if (tableSchema == null) {
+ // If the table already exists, then try and fetch the
schema from the existing
+ // table.
+ TableReference tableReference =
getTable(destination).getTableReference();
+ @Nullable Table table =
datasetService.getTable(tableReference);
+ if (table == null) {
+ if (createDisposition == CreateDisposition.CREATE_NEVER) {
+ throw new RuntimeException(
+ "BigQuery table "
+ + tableReference
+ + " not found. If you wanted to "
+ + "automatically create the table, set the
create disposition to CREATE_IF_NEEDED and specify a "
+ + "schema.");
+ } else {
+ throw new RuntimeException(
+ "Schema must be set for table "
+ + tableReference
+ + " when writing TableRows using Storage API and
"
+ + "using a create disposition of
CREATE_IF_NEEDED.");
+ }
+ }
+ tableSchema = table.getSchema();
+ }
+ return
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
+ });
@Override
public Descriptor getSchemaDescriptor() {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java
index a891c48..768ecb1 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java
@@ -364,12 +364,16 @@ public class FakeJobService implements JobService,
Serializable {
throws InterruptedException, IOException {
TableReference destination = load.getDestinationTable();
TableSchema schema = load.getSchema();
- checkArgument(schema != null, "No schema specified");
List<ResourceId> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(),
jobRef.getJobId());
WriteDisposition writeDisposition =
WriteDisposition.valueOf(load.getWriteDisposition());
CreateDisposition createDisposition =
CreateDisposition.valueOf(load.getCreateDisposition());
Table existingTable = datasetService.getTable(destination);
+ if (schema == null) {
+ schema = existingTable.getSchema();
+ }
+ checkArgument(schema != null, "No schema specified");
+
if (!validateDispositions(existingTable, createDisposition,
writeDisposition)) {
return new JobStatus().setState("FAILED").setErrorResult(new
ErrorProto());
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 088a7c4..3d8f0fa 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1564,32 +1564,45 @@ public class BigQueryIOWriteTest implements
Serializable {
}
@Test
- public void testCreateNeverWithStreaming() throws Exception {
- if (!useStreaming) {
- return;
- }
+ public void testCreateNever() throws Exception {
+ BigQueryIO.Write.Method method =
+ useStreaming
+ ? (useStorageApi
+ ? (useStorageApiApproximate
+ ? Method.STORAGE_API_AT_LEAST_ONCE
+ : Method.STORAGE_WRITE_API)
+ : Method.STREAMING_INSERTS)
+ : useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS;
p.enableAbandonedNodeEnforcement(false);
- TableReference tableRef = new TableReference();
- tableRef.setDatasetId("dataset");
- tableRef.setTableId("sometable");
+ TableReference tableRef =
BigQueryHelpers.parseTableSpec("project-id:dataset-id.table");
+ TableSchema tableSchema =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new
TableFieldSchema().setName("number").setType("INTEGER")));
+ fakeDatasetService.createTable(new
Table().setTableReference(tableRef).setSchema(tableSchema));
PCollection<TableRow> tableRows =
- p.apply(GenerateSequence.from(0))
+ p.apply(GenerateSequence.from(0).to(10))
.apply(
MapElements.via(
new SimpleFunction<Long, TableRow>() {
@Override
public TableRow apply(Long input) {
- return null;
+ return new TableRow().set("name", "name " +
input).set("number", input);
}
}))
.setCoder(TableRowJsonCoder.of());
tableRows.apply(
BigQueryIO.writeTableRows()
.to(tableRef)
+ .withMethod(method)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
+ .withTestServices(fakeBqServices)
.withoutValidation());
+ p.run();
}
@Test