This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 77ea3ab  for existing tables, no need to set a schema
     new 8b81ab6  Merge pull request #15929 from [BEAM-2791] For existing 
BigQuery tables, no need to set a schema.
77ea3ab is described below

commit 77ea3abc98fc0098fd1017502e5ce0de7e868dfe
Author: Reuven Lax <[email protected]>
AuthorDate: Tue Nov 9 19:01:02 2021 -0800

    for existing tables, no need to set a schema
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  9 ++--
 .../StorageApiDynamicDestinationsTableRow.java     | 49 ++++++++++++++++++----
 .../beam/sdk/io/gcp/testing/FakeJobService.java    |  6 ++-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 31 ++++++++++----
 4 files changed, 73 insertions(+), 22 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 31cc6fd..7f52a13 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2867,13 +2867,14 @@ public class BigQueryIO {
         }
         return input.apply(batchLoads);
       } else if (method == Method.STORAGE_WRITE_API || method == 
Method.STORAGE_API_AT_LEAST_ONCE) {
+        BigQueryOptions bqOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
         StorageApiDynamicDestinations<T, DestinationT> 
storageApiDynamicDestinations;
         if (getUseBeamSchema()) {
           // This ensures that the Beam rows are directly translated into 
protos for Sorage API
           // writes, with no
           // need to round trip through JSON TableRow objects.
           storageApiDynamicDestinations =
-              new StorageApiDynamicDestinationsBeamRow<T, DestinationT>(
+              new StorageApiDynamicDestinationsBeamRow<>(
                   dynamicDestinations, elementSchema, elementToRowFunction);
         } else {
           RowWriterFactory.TableRowWriterFactory<T, DestinationT> 
tableRowWriterFactory =
@@ -2881,10 +2882,12 @@ public class BigQueryIO {
           // Fallback behavior: convert to JSON TableRows and convert those 
into Beam TableRows.
           storageApiDynamicDestinations =
               new StorageApiDynamicDestinationsTableRow<>(
-                  dynamicDestinations, tableRowWriterFactory.getToRowFn());
+                  dynamicDestinations,
+                  tableRowWriterFactory.getToRowFn(),
+                  getBigQueryServices().getDatasetService(bqOptions),
+                  getCreateDisposition());
         }
 
-        BigQueryOptions bqOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
         StorageApiLoads<DestinationT, T> storageApiLoads =
             new StorageApiLoads<DestinationT, T>(
                 destinationCoder,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index 0204488..de4817b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -17,11 +17,16 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Message;
 import java.time.Duration;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
@@ -30,31 +35,57 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuild
 public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
     extends StorageApiDynamicDestinations<T, DestinationT> {
   private final SerializableFunction<T, TableRow> formatFunction;
+  private final DatasetService datasetService;
+  private final CreateDisposition createDisposition;
 
-  // TODO: Make static! Or at least optimize the constant schema case.
+  // TODO: Is this cache needed? All callers of getMessageConverter are 
already caching the resullt.
   private final Cache<DestinationT, Descriptor> destinationDescriptorCache =
       
CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(15)).build();
 
   StorageApiDynamicDestinationsTableRow(
       DynamicDestinations<T, DestinationT> inner,
-      SerializableFunction<T, TableRow> formatFunction) {
+      SerializableFunction<T, TableRow> formatFunction,
+      DatasetService datasetService,
+      CreateDisposition createDisposition) {
     super(inner);
     this.formatFunction = formatFunction;
+    this.datasetService = datasetService;
+    this.createDisposition = createDisposition;
   }
 
   @Override
   public MessageConverter<T> getMessageConverter(DestinationT destination) 
throws Exception {
-    final TableSchema tableSchema = getSchema(destination);
-    if (tableSchema == null) {
-      throw new RuntimeException(
-          "Schema must be set when writing TableRows using Storage API. Use "
-              + "BigQueryIO.Write.withSchema to set the schema.");
-    }
     return new MessageConverter<T>() {
       Descriptor descriptor =
           destinationDescriptorCache.get(
               destination,
-              () -> 
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema));
+              () -> {
+                @Nullable TableSchema tableSchema = getSchema(destination);
+                if (tableSchema == null) {
+                  // If the table already exists, then try and fetch the 
schema from the existing
+                  // table.
+                  TableReference tableReference = 
getTable(destination).getTableReference();
+                  @Nullable Table table = 
datasetService.getTable(tableReference);
+                  if (table == null) {
+                    if (createDisposition == CreateDisposition.CREATE_NEVER) {
+                      throw new RuntimeException(
+                          "BigQuery table "
+                              + tableReference
+                              + " not found. If you wanted to "
+                              + "automatically create the table, set the 
create disposition to CREATE_IF_NEEDED and specify a "
+                              + "schema.");
+                    } else {
+                      throw new RuntimeException(
+                          "Schema must be set for table "
+                              + tableReference
+                              + " when writing TableRows using Storage API and 
"
+                              + "using a create disposition of 
CREATE_IF_NEEDED.");
+                    }
+                  }
+                  tableSchema = table.getSchema();
+                }
+                return 
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
+              });
 
       @Override
       public Descriptor getSchemaDescriptor() {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java
index a891c48..768ecb1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java
@@ -364,12 +364,16 @@ public class FakeJobService implements JobService, 
Serializable {
       throws InterruptedException, IOException {
     TableReference destination = load.getDestinationTable();
     TableSchema schema = load.getSchema();
-    checkArgument(schema != null, "No schema specified");
     List<ResourceId> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), 
jobRef.getJobId());
     WriteDisposition writeDisposition = 
WriteDisposition.valueOf(load.getWriteDisposition());
     CreateDisposition createDisposition = 
CreateDisposition.valueOf(load.getCreateDisposition());
 
     Table existingTable = datasetService.getTable(destination);
+    if (schema == null) {
+      schema = existingTable.getSchema();
+    }
+    checkArgument(schema != null, "No schema specified");
+
     if (!validateDispositions(existingTable, createDisposition, 
writeDisposition)) {
       return new JobStatus().setState("FAILED").setErrorResult(new 
ErrorProto());
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 088a7c4..3d8f0fa 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1564,32 +1564,45 @@ public class BigQueryIOWriteTest implements 
Serializable {
   }
 
   @Test
-  public void testCreateNeverWithStreaming() throws Exception {
-    if (!useStreaming) {
-      return;
-    }
+  public void testCreateNever() throws Exception {
+    BigQueryIO.Write.Method method =
+        useStreaming
+            ? (useStorageApi
+                ? (useStorageApiApproximate
+                    ? Method.STORAGE_API_AT_LEAST_ONCE
+                    : Method.STORAGE_WRITE_API)
+                : Method.STREAMING_INSERTS)
+            : useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS;
     p.enableAbandonedNodeEnforcement(false);
 
-    TableReference tableRef = new TableReference();
-    tableRef.setDatasetId("dataset");
-    tableRef.setTableId("sometable");
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("project-id:dataset-id.table");
+    TableSchema tableSchema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new 
TableFieldSchema().setName("number").setType("INTEGER")));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef).setSchema(tableSchema));
 
     PCollection<TableRow> tableRows =
-        p.apply(GenerateSequence.from(0))
+        p.apply(GenerateSequence.from(0).to(10))
             .apply(
                 MapElements.via(
                     new SimpleFunction<Long, TableRow>() {
                       @Override
                       public TableRow apply(Long input) {
-                        return null;
+                        return new TableRow().set("name", "name " + 
input).set("number", input);
                       }
                     }))
             .setCoder(TableRowJsonCoder.of());
     tableRows.apply(
         BigQueryIO.writeTableRows()
             .to(tableRef)
+            .withMethod(method)
             
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
+            .withTestServices(fakeBqServices)
             .withoutValidation());
+    p.run();
   }
 
   @Test

Reply via email to