This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ba04f6098f7 Merge pull request #27866: Allow writing protos directly 
to the storage API without conversion
ba04f6098f7 is described below

commit ba04f6098f7472a123c6cd7b8d4a991ce16d1958
Author: Reuven Lax <[email protected]>
AuthorDate: Thu Aug 10 09:15:58 2023 -0700

    Merge pull request #27866: Allow writing protos directly to the storage API 
without conversion
---
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |  32 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  96 +++++
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |   5 +-
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |   8 +-
 .../bigquery/StorageApiDynamicDestinations.java    |   3 +
 .../StorageApiDynamicDestinationsBeamRow.java      |   6 +
 ...StorageApiDynamicDestinationsGenericRecord.java |   7 +
 .../StorageApiDynamicDestinationsProto.java        | 100 +++++
 .../StorageApiDynamicDestinationsTableRow.java     |   7 +
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  66 +++-
 .../bigquery/StorageApiWritesShardedRecords.java   |  29 +-
 .../io/gcp/bigquery/TableRowToStorageApiProto.java |  83 +++-
 .../sdk/io/gcp/testing/FakeDatasetService.java     |   7 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 111 +++++-
 .../bigquery/StorageApiDirectWriteProtosIT.java    | 197 ++++++++++
 .../bigquery/TableRowToStorageApiProtoTest.java    | 418 ++++++++++++++++++++-
 16 files changed, 1109 insertions(+), 66 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index b8a7fc760bc..46c25d47e7a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -22,9 +22,9 @@ import com.google.auto.value.AutoValue;
 import com.google.auto.value.extension.memoized.Memoized;
 import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -49,7 +49,7 @@ abstract class AppendClientInfo {
 
   abstract @Nullable String getStreamName();
 
-  abstract Descriptors.Descriptor getDescriptor();
+  abstract DescriptorProtos.DescriptorProto getDescriptor();
 
   @AutoValue.Builder
   abstract static class Builder {
@@ -63,7 +63,7 @@ abstract class AppendClientInfo {
 
     abstract Builder 
setSchemaInformation(TableRowToStorageApiProto.SchemaInformation value);
 
-    abstract Builder setDescriptor(Descriptors.Descriptor value);
+    abstract Builder setDescriptor(DescriptorProtos.DescriptorProto value);
 
     abstract Builder setStreamName(@Nullable String name);
 
@@ -74,8 +74,8 @@ abstract class AppendClientInfo {
 
   static AppendClientInfo of(
       TableSchema tableSchema,
-      Consumer<BigQueryServices.StreamAppendClient> closeAppendClient,
-      boolean includeCdcColumns)
+      DescriptorProtos.DescriptorProto descriptor,
+      Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
       throws Exception {
     return new AutoValue_AppendClientInfo.Builder()
         .setTableSchema(tableSchema)
@@ -83,12 +83,22 @@ abstract class AppendClientInfo {
         
.setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema))
         .setSchemaInformation(
             
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema))
-        .setDescriptor(
-            TableRowToStorageApiProto.getDescriptorFromTableSchema(
-                tableSchema, true, includeCdcColumns))
+        .setDescriptor(descriptor)
         .build();
   }
 
+  static AppendClientInfo of(
+      TableSchema tableSchema,
+      Consumer<BigQueryServices.StreamAppendClient> closeAppendClient,
+      boolean includeCdcColumns)
+      throws Exception {
+    return of(
+        tableSchema,
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+            tableSchema, true, includeCdcColumns),
+        closeAppendClient);
+  }
+
   public AppendClientInfo withNoAppendClient() {
     return toBuilder().setStreamAppendClient(null).build();
   }
@@ -149,8 +159,10 @@ abstract class AppendClientInfo {
   public TableRow toTableRow(ByteString protoBytes) {
     try {
       return TableRowToStorageApiProto.tableRowFromMessage(
-          DynamicMessage.parseFrom(getDescriptor(), protoBytes), true);
-    } catch (InvalidProtocolBufferException e) {
+          DynamicMessage.parseFrom(
+              TableRowToStorageApiProto.wrapDescriptorProto(getDescriptor()), 
protoBytes),
+          true);
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 57f77cbcd2f..fd445bcfc39 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -44,8 +44,12 @@ import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -2131,6 +2135,7 @@ public class BigQueryIO {
         .setDeterministicRecordIdFn(null)
         .setMaxRetryJobs(1000)
         .setPropagateSuccessfulStorageApiWrites(false)
+        .setDirectWriteProtos(true)
         .build();
   }
 
@@ -2170,6 +2175,27 @@ public class BigQueryIO {
         .withAvroFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER);
   }
 
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} containing 
protocol buffer objects to a
+   * BigQuery table. If using one of the storage-api write methods, these 
protocol buffers must
+   * match the schema of the table.
+   *
+   * <p>If a Schema is provided using {@link Write#withSchema}, that schema 
will be used for
+   * creating the table if necessary. If no schema is provided, one will be 
inferred from the
+   * protocol buffer's descriptor. Note that inferring a schema from the 
protocol buffer may not
+   * always provide the intended schema as multiple BigQuery types can map to 
the same protocol
+   * buffer type. For example, a protocol buffer field of type INT64 may be an 
INT64 BigQuery type,
+   * but it might also represent a TIME, DATETIME, or a TIMESTAMP type.
+   */
+  public static <T extends Message> Write<T> writeProtos(Class<T> 
protoMessageClass) {
+    if (DynamicMessage.class.equals(protoMessageClass)) {
+      throw new IllegalArgumentException("DynamicMessage is not supported.");
+    }
+    return BigQueryIO.<T>write()
+        .withFormatFunction(m -> 
TableRowToStorageApiProto.tableRowFromMessage(m, false))
+        .withWriteProtosClass(protoMessageClass);
+  }
+
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, 
WriteResult> {
@@ -2302,6 +2328,10 @@ public class BigQueryIO {
 
     abstract Boolean getAutoSchemaUpdate();
 
+    abstract @Nullable Class<T> getWriteProtosClass();
+
+    abstract Boolean getDirectWriteProtos();
+
     abstract @Nullable SerializableFunction<T, String> 
getDeterministicRecordIdFn();
 
     abstract @Nullable String getWriteTempDataset();
@@ -2400,6 +2430,10 @@ public class BigQueryIO {
 
       abstract Builder<T> setAutoSchemaUpdate(Boolean autoSchemaUpdate);
 
+      abstract Builder<T> setWriteProtosClass(@Nullable Class<T> clazz);
+
+      abstract Builder<T> setDirectWriteProtos(Boolean direct);
+
       abstract Builder<T> setDeterministicRecordIdFn(
           SerializableFunction<T, String> toUniqueIdFunction);
 
@@ -2780,6 +2814,20 @@ public class BigQueryIO {
       return toBuilder().setRowMutationInformationFn(updateFn).build();
     }
 
+    Write<T> withWriteProtosClass(Class<T> clazz) {
+      return toBuilder().setWriteProtosClass(clazz).build();
+    }
+
+    /*
+    When using {@link Write.Method#STORAGE_API} or {@link 
Write.Method#STORAGE_API_AT_LEAST_ONCE} along with
+    {@link BigQueryIO.writeProtos}, the sink will try to write the protos 
directly to BigQuery without modification.
+    In some cases this is not supported or BigQuery cannot directly interpet 
the proto. In these cases, the direct
+    proto write
+     */
+    public Write<T> withDirectWriteProtos(boolean directWriteProtos) {
+      return toBuilder().setDirectWriteProtos(directWriteProtos).build();
+    }
+
     /**
      * Set the project the BigQuery load job will be initiated from. This is 
only applicable when
      * the write method is set to {@link Method#FILE_LOADS}. If omitted, the 
project of the
@@ -3281,6 +3329,7 @@ public class BigQueryIO {
               || getDynamicDestinations() != null
               || getSchemaFromView() != null;
 
+      Class<T> writeProtoClass = getWriteProtosClass();
       if (getUseBeamSchema()) {
         checkArgument(input.hasSchema(), "The input doesn't has a schema");
         optimizeWrites = true;
@@ -3296,11 +3345,34 @@ public class BigQueryIO {
           formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction());
         }
         // Infer the TableSchema from the input Beam schema.
+        // TODO: If the user provided a schema, we should use that. There are 
things that can be
+        // specified in a
+        // BQ schema that don't have exact matches in a Beam schema (e.g. 
GEOGRAPHY types).
         TableSchema tableSchema = 
BigQueryUtils.toTableSchema(input.getSchema());
         dynamicDestinations =
             new ConstantSchemaDestinations<>(
                 dynamicDestinations,
                 
StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema)));
+      } else if (writeProtoClass != null) {
+        if (!hasSchema) {
+          try {
+            @SuppressWarnings({"unchecked", "nullness"})
+            Descriptors.Descriptor descriptor =
+                (Descriptors.Descriptor)
+                    org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
+                            writeProtoClass.getMethod("getDescriptor"))
+                        .invoke(null);
+            TableSchema tableSchema =
+                TableRowToStorageApiProto.protoSchemaToTableSchema(
+                    
TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor));
+            dynamicDestinations =
+                new ConstantSchemaDestinations<>(
+                    dynamicDestinations,
+                    
StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema)));
+          } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
+            throw new IllegalArgumentException(e);
+          }
+        }
       } else {
         // Require a schema if creating one or more tables.
         checkArgument(
@@ -3379,6 +3451,7 @@ public class BigQueryIO {
           method);
     }
 
+    @SuppressWarnings("rawtypes")
     private <DestinationT> WriteResult continueExpandTyped(
         PCollection<KV<DestinationT, T>> input,
         Coder<T> elementCoder,
@@ -3501,6 +3574,28 @@ public class BigQueryIO {
                   elementSchema,
                   elementToRowFunction,
                   getRowMutationInformationFn() != null);
+        } else if (getWriteProtosClass() != null && getDirectWriteProtos()) {
+          // We could support both of these by falling back to
+          // StorageApiDynamicDestinationsTableRow. This
+          // would defeat the optimization (we would be forced to create a new 
dynamic proto message
+          // and copy the data over). For now, we simply give the user a way 
to disable the
+          // optimization themselves.
+          checkArgument(
+              getRowMutationInformationFn() == null,
+              "Row upserts and deletes are not for direct proto writes. "
+                  + "Try setting withDirectWriteProtos(false)");
+          checkArgument(
+              !getAutoSchemaUpdate(),
+              "withAutoSchemaUpdate not supported when using writeProtos."
+                  + " Try setting withDirectWriteProtos(false)");
+          checkArgument(
+              !getIgnoreUnknownValues(),
+              "ignoreUnknownValues not supported when using writeProtos."
+                  + " Try setting withDirectWriteProtos(false)");
+          storageApiDynamicDestinations =
+              (StorageApiDynamicDestinations<T, DestinationT>)
+                  new StorageApiDynamicDestinationsProto(
+                      dynamicDestinations, getWriteProtosClass());
         } else if (getAvroRowWriterFactory() != null) {
           // we can configure the avro to storage write api proto converter 
for this
           // assuming the format function returns an Avro GenericRecord
@@ -3537,6 +3632,7 @@ public class BigQueryIO {
                   getIgnoreUnknownValues(),
                   getAutoSchemaUpdate());
         }
+
         int numShards = getStorageApiNumStreams(bqOptions);
         boolean enableAutoSharding = getAutoSharding();
         if (numShards == 0) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 6f178bd6150..1cc9049a542 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -43,7 +43,7 @@ import 
com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
 import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
 import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.cloud.bigquery.storage.v1.WriteStream;
-import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.DescriptorProtos;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
@@ -213,7 +213,8 @@ public interface BigQueryServices extends Serializable {
      * first.
      */
     StreamAppendClient getStreamAppendClient(
-        String streamName, Descriptor descriptor, boolean useConnectionPool) 
throws Exception;
+        String streamName, DescriptorProtos.DescriptorProto descriptor, 
boolean useConnectionPool)
+        throws Exception;
 
     /** Flush a given stream up to the given offset. The stream must have type 
BUFFERED. */
     ApiFuture<FlushRowsResponse> flush(String streamName, long flushOffset)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index dd423294f6c..17b5c5ebd99 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -86,7 +86,7 @@ import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.cloud.bigquery.storage.v1.WriteStream;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Int64Value;
 import com.google.rpc.RetryInfo;
 import io.grpc.Metadata;
@@ -1352,9 +1352,9 @@ class BigQueryServicesImpl implements BigQueryServices {
 
     @Override
     public StreamAppendClient getStreamAppendClient(
-        String streamName, Descriptor descriptor, boolean useConnectionPool) 
throws Exception {
-      ProtoSchema protoSchema =
-          
ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build();
+        String streamName, DescriptorProtos.DescriptorProto descriptor, 
boolean useConnectionPool)
+        throws Exception {
+      ProtoSchema protoSchema = 
ProtoSchema.newBuilder().setProtoDescriptor(descriptor).build();
 
       TransportChannelProvider transportChannelProvider =
           BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
index b3085b15e95..fdf330d378f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
@@ -33,6 +34,8 @@ abstract class StorageApiDynamicDestinations<T, DestinationT>
   public interface MessageConverter<T> {
     com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema();
 
+    DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns) 
throws Exception;
+
     StorageApiWritePayload toMessage(
         T element, @Nullable RowMutationInformation rowMutationInformation) 
throws Exception;
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
index e0964135be3..7821a7d9199 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Message;
 import javax.annotation.Nullable;
@@ -75,6 +76,11 @@ class StorageApiDynamicDestinationsBeamRow<T, DestinationT 
extends @NonNull Obje
       return tableSchema;
     }
 
+    @Override
+    public DescriptorProtos.DescriptorProto getDescriptor(boolean 
includeCdcColumns) {
+      return cdcDescriptor != null ? cdcDescriptor.toProto() : 
descriptor.toProto();
+    }
+
     @Override
     @SuppressWarnings("nullness")
     public StorageApiWritePayload toMessage(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
index 53f3b137bf3..79141d73a39 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Message;
 import org.apache.avro.Schema;
@@ -109,5 +110,11 @@ class StorageApiDynamicDestinationsGenericRecord<T, 
DestinationT extends @NonNul
     public com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema() {
       return protoTableSchema;
     }
+
+    @Override
+    public DescriptorProtos.DescriptorProto getDescriptor(boolean 
includeCdcColumns)
+        throws Exception {
+      return cdcDescriptor != null ? cdcDescriptor.toProto() : 
descriptor.toProto();
+    }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java
new file mode 100644
index 00000000000..57dbdc9d1e7
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import java.lang.reflect.InvocationTargetException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.util.Preconditions;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/** Storage API DynamicDestinations used when the input is a compiled protocol 
buffer. */
+class StorageApiDynamicDestinationsProto<T extends Message, DestinationT 
extends @NonNull Object>
+    extends StorageApiDynamicDestinations<T, DestinationT> {
+  DescriptorProtos.DescriptorProto descriptorProto;
+
+  @SuppressWarnings({"unchecked", "nullness"})
+  StorageApiDynamicDestinationsProto(
+      DynamicDestinations<T, DestinationT> inner, Class<T> protoClass) {
+    super(inner);
+    try {
+      this.descriptorProto =
+          fixNestedTypes(
+              (Descriptors.Descriptor)
+                  
Preconditions.checkStateNotNull(protoClass.getMethod("getDescriptor"))
+                      .invoke(null));
+    } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  @Override
+  public MessageConverter<T> getMessageConverter(
+      DestinationT destination, DatasetService datasetService) throws 
Exception {
+    return new Converter(
+        TableRowToStorageApiProto.schemaToProtoTableSchema(
+            Preconditions.checkStateNotNull(getSchema(destination))));
+  }
+
+  class Converter implements MessageConverter<T> {
+    TableSchema tableSchema;
+
+    Converter(TableSchema tableSchema) {
+      this.tableSchema = tableSchema;
+    }
+
+    @Override
+    public TableSchema getTableSchema() {
+      return tableSchema;
+    }
+
+    @Override
+    public DescriptorProtos.DescriptorProto getDescriptor(boolean 
includeCdcColumns)
+        throws Exception {
+      
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(
+          !includeCdcColumns);
+      return descriptorProto;
+    }
+
+    @Override
+    public StorageApiWritePayload toMessage(
+        T element, @Nullable RowMutationInformation rowMutationInformation) 
throws Exception {
+      // NB: What makes this path efficient is that the storage API directly 
understands protos, so
+      // we can forward
+      // the through directly. This means that we don't currently support 
ignoreUnknownValues or
+      // autoUpdateSchema.
+      return StorageApiWritePayload.of(element.toByteArray(), null);
+    }
+
+    @Override
+    public TableRow toTableRow(T element) {
+      throw new RuntimeException("Not implemented!");
+    }
+  };
+
+  private static DescriptorProtos.DescriptorProto fixNestedTypes(
+      Descriptors.Descriptor descriptor) {
+    return ProtoSchemaConverter.convert(descriptor).getProtoDescriptor();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index 19569fe068a..eb93d7c398f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Message;
 import java.util.concurrent.ExecutionException;
@@ -143,6 +144,12 @@ public class StorageApiDynamicDestinationsTableRow<T, 
DestinationT extends @NonN
       return protoTableSchema;
     }
 
+    @Override
+    public DescriptorProtos.DescriptorProto getDescriptor(boolean 
includeCdcColumns)
+        throws Exception {
+      return cdcDescriptor != null ? cdcDescriptor.toProto() : 
descriptor.toProto();
+    }
+
     @Override
     public TableRow toTableRow(T element) {
       return formatFunction.apply(element);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 5c6584fa32c..27a5b30c156 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -30,8 +30,8 @@ import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.cloud.bigquery.storage.v1.WriteStream;
 import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
 import io.grpc.Status;
 import java.io.IOException;
 import java.time.Instant;
@@ -46,6 +46,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -278,6 +279,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
 
       private final boolean useDefaultStream;
       private TableSchema initialTableSchema;
+      private DescriptorProtos.DescriptorProto initialDescriptor;
       private Instant nextCacheTickle = Instant.MAX;
       private final int clientNumber;
       private final boolean usingMultiplexing;
@@ -302,6 +304,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
         this.maybeDatasetService = datasetService;
         this.useDefaultStream = useDefaultStream;
         this.initialTableSchema = messageConverter.getTableSchema();
+        this.initialDescriptor = 
messageConverter.getDescriptor(includeCdcColumns);
         this.clientNumber = new Random().nextInt(streamAppendClientCount);
         this.usingMultiplexing = usingMultiplexing;
         this.maxRequestSize = maxRequestSize;
@@ -359,12 +362,13 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       }
 
       AppendClientInfo generateClient(@Nullable TableSchema updatedSchema) 
throws Exception {
-        TableSchema tableSchema =
-            (updatedSchema != null) ? updatedSchema : 
getCurrentTableSchema(streamName);
+        SchemaAndDescriptor schemaAndDescriptor = 
getCurrentTableSchema(streamName, updatedSchema);
+
         AtomicReference<AppendClientInfo> appendClientInfo =
             new AtomicReference<>(
                 AppendClientInfo.of(
-                    tableSchema,
+                    schemaAndDescriptor.tableSchema,
+                    schemaAndDescriptor.descriptor,
                     // Make sure that the client is always closed in a 
different thread to avoid
                     // blocking.
                     client ->
@@ -376,8 +380,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
                                 client.unpin();
                                 client.close();
                               }
-                            }),
-                    includeCdcColumns));
+                            })));
 
         CreateTableHelpers.createTableWrapper(
             () -> {
@@ -398,8 +401,28 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
         return appendClientInfo.get();
       }
 
-      TableSchema getCurrentTableSchema(String stream) throws Exception {
+      private class SchemaAndDescriptor {
+        private final TableSchema tableSchema;
+        private final DescriptorProtos.DescriptorProto descriptor;
+
+        private SchemaAndDescriptor(
+            TableSchema tableSchema, DescriptorProtos.DescriptorProto 
descriptor) {
+          this.tableSchema = tableSchema;
+          this.descriptor = descriptor;
+        }
+      }
+
+      SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable 
TableSchema updatedSchema)
+          throws Exception {
+        if (updatedSchema != null) {
+          return new SchemaAndDescriptor(
+              updatedSchema,
+              TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+                  updatedSchema, true, includeCdcColumns));
+        }
+
         AtomicReference<TableSchema> currentSchema = new 
AtomicReference<>(initialTableSchema);
+        AtomicBoolean updated = new AtomicBoolean();
         CreateTableHelpers.createTableWrapper(
             () -> {
               if (autoUpdateSchema) {
@@ -408,12 +431,23 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                     
Preconditions.checkStateNotNull(maybeDatasetService).getWriteStream(streamName);
                 if (writeStream != null && writeStream.hasTableSchema()) {
                   currentSchema.set(writeStream.getTableSchema());
+                  updated.set(true);
                 }
               }
               return null;
             },
             tryCreateTable);
-        return currentSchema.get();
+        // Note: While it may appear that these two branches are the same, 
it's important to return
+        // the actual
+        // initial descriptor if the schema has not changed. Simply converting 
the schema back into
+        // a descriptor isn't
+        // the same, and would break the direct-from-proto ingestion path.
+        DescriptorProtos.DescriptorProto descriptor =
+            updated.get()
+                ? TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+                    currentSchema.get(), true, includeCdcColumns)
+                : initialDescriptor;
+        return new SchemaAndDescriptor(currentSchema.get(), descriptor);
       }
 
       AppendClientInfo getAppendClientInfo(
@@ -555,7 +589,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
             TableRow failedRow =
                 TableRowToStorageApiProto.tableRowFromMessage(
                     DynamicMessage.parseFrom(
-                        getAppendClientInfo(true, null).getDescriptor(), 
rowBytes),
+                        TableRowToStorageApiProto.wrapDescriptorProto(
+                            getAppendClientInfo(true, null).getDescriptor()),
+                        rowBytes),
                     true);
             failedRowsReceiver.outputWithTimestamp(
                 new BigQueryStorageApiInsertError(
@@ -618,14 +654,16 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                     TableRow failedRow =
                         TableRowToStorageApiProto.tableRowFromMessage(
                             DynamicMessage.parseFrom(
-                                
Preconditions.checkStateNotNull(appendClientInfo).getDescriptor(),
+                                TableRowToStorageApiProto.wrapDescriptorProto(
+                                    
Preconditions.checkStateNotNull(appendClientInfo)
+                                        .getDescriptor()),
                                 protoBytes),
                             true);
                     failedRowsReceiver.outputWithTimestamp(
                         new BigQueryStorageApiInsertError(
                             failedRow, 
error.getRowIndexToErrorMessage().get(failedIndex)),
                         timestamp);
-                  } catch (InvalidProtocolBufferException e) {
+                  } catch (Exception e) {
                     LOG.error("Failed to insert row and could not parse the 
result!", e);
                   }
                 }
@@ -716,12 +754,14 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                     TableRow row =
                         TableRowToStorageApiProto.tableRowFromMessage(
                             DynamicMessage.parseFrom(
-                                
Preconditions.checkStateNotNull(appendClientInfo).getDescriptor(),
+                                TableRowToStorageApiProto.wrapDescriptorProto(
+                                    
Preconditions.checkStateNotNull(appendClientInfo)
+                                        .getDescriptor()),
                                 rowBytes),
                             true);
                     org.joda.time.Instant timestamp = c.timestamps.get(i);
                     successfulRowsReceiver.outputWithTimestamp(row, timestamp);
-                  } catch (InvalidProtocolBufferException e) {
+                  } catch (Exception e) {
                     LOG.warn("Failure parsing TableRow", e);
                   }
                 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index d5bd27f5efe..cc7f221e32e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -29,6 +29,7 @@ import com.google.cloud.bigquery.storage.v1.ProtoRows;
 import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
 import io.grpc.Status;
 import io.grpc.Status.Code;
 import java.io.IOException;
@@ -458,21 +459,28 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
       Callable<AppendClientInfo> getAppendClientInfo =
           () -> {
             @Nullable TableSchema tableSchema;
-            if (autoUpdateSchema && updatedSchema.read() != null) {
-              // We've seen an updated schema, so we use that.
-              tableSchema = updatedSchema.read();
+            DescriptorProtos.DescriptorProto descriptor;
+            TableSchema updatedSchemaValue = updatedSchema.read();
+            if (autoUpdateSchema && updatedSchemaValue != null) {
+              // We've seen an updated schema, so we use that instead of 
querying the
+              // MessageConverter.
+              tableSchema = updatedSchemaValue;
+              descriptor =
+                  TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+                      tableSchema, true, false);
             } else {
               // Start off with the base schema. As we get notified of schema 
updates, we
-              // will update the
-              // descriptor.
-              tableSchema =
-                  messageConverters
-                      .get(element.getKey().getKey(), dynamicDestinations, 
datasetService)
-                      .getTableSchema();
+              // will update the descriptor.
+              StorageApiDynamicDestinations.MessageConverter<?> converter =
+                  messageConverters.get(
+                      element.getKey().getKey(), dynamicDestinations, 
datasetService);
+              tableSchema = converter.getTableSchema();
+              descriptor = converter.getDescriptor(false);
             }
             AppendClientInfo info =
                 AppendClientInfo.of(
                         Preconditions.checkStateNotNull(tableSchema),
+                        descriptor,
                         // Make sure that the client is always closed in a 
different thread
                         // to
                         // avoid blocking.
@@ -483,8 +491,7 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
                                   // Remove the pin that is "owned" by the 
cache.
                                   client.unpin();
                                   client.close();
-                                }),
-                        false)
+                                }))
                     .withAppendClient(datasetService, getOrCreateStream, 
false);
             // This pin is "owned" by the cache.
             
Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 3a20bc764d5..c31886da614 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -30,6 +30,7 @@ import 
com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
 import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
 import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
 import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Descriptors.DescriptorValidationException;
 import com.google.protobuf.Descriptors.FieldDescriptor;
@@ -216,7 +217,7 @@ public class TableRowToStorageApiProto {
     return Optional.ofNullable(mode)
         .map(Mode::valueOf)
         .map(m -> MODE_MAP_JSON_PROTO.get(m))
-        .orElse(TableFieldSchema.Mode.REQUIRED);
+        .orElse(TableFieldSchema.Mode.NULLABLE);
   }
 
   public static String protoModeToJsonMode(TableFieldSchema.Mode protoMode) {
@@ -302,16 +303,14 @@ public class TableRowToStorageApiProto {
   public static TableFieldSchema tableFieldToProtoTableField(
       com.google.api.services.bigquery.model.TableFieldSchema field) {
     TableFieldSchema.Builder builder = TableFieldSchema.newBuilder();
-    builder.setName(field.getName());
+    builder.setName(field.getName().toLowerCase());
     if (field.getDescription() != null) {
       builder.setDescription(field.getDescription());
     }
     if (field.getMaxLength() != null) {
       builder.setMaxLength(field.getMaxLength());
     }
-    if (field.getMode() != null) {
-      builder.setMode(modeToProtoMode(field.getMode()));
-    }
+    builder.setMode(modeToProtoMode(field.getMode()));
     if (field.getPrecision() != null) {
       builder.setPrecision(field.getPrecision());
     }
@@ -395,7 +394,7 @@ public class TableRowToStorageApiProto {
     }
   }
 
-  static final Map<TableFieldSchema.Type, Type> PRIMITIVE_TYPES =
+  static final Map<TableFieldSchema.Type, Type> PRIMITIVE_TYPES_BQ_TO_PROTO =
       ImmutableMap.<TableFieldSchema.Type, Type>builder()
           .put(TableFieldSchema.Type.INT64, Type.TYPE_INT64)
           .put(TableFieldSchema.Type.DOUBLE, Type.TYPE_DOUBLE)
@@ -412,6 +411,26 @@ public class TableRowToStorageApiProto {
           .put(TableFieldSchema.Type.JSON, Type.TYPE_STRING)
           .build();
 
+  static final Map<Descriptors.FieldDescriptor.Type, TableFieldSchema.Type>
+      PRIMITIVE_TYPES_PROTO_TO_BQ =
+          ImmutableMap.<Descriptors.FieldDescriptor.Type, 
TableFieldSchema.Type>builder()
+              .put(Descriptors.FieldDescriptor.Type.INT32, 
TableFieldSchema.Type.INT64)
+              .put(FieldDescriptor.Type.FIXED32, TableFieldSchema.Type.INT64)
+              .put(FieldDescriptor.Type.UINT32, TableFieldSchema.Type.INT64)
+              .put(FieldDescriptor.Type.SFIXED32, TableFieldSchema.Type.INT64)
+              .put(FieldDescriptor.Type.SINT32, TableFieldSchema.Type.INT64)
+              .put(FieldDescriptor.Type.INT64, TableFieldSchema.Type.INT64)
+              .put(FieldDescriptor.Type.FIXED64, TableFieldSchema.Type.NUMERIC)
+              .put(FieldDescriptor.Type.UINT64, TableFieldSchema.Type.NUMERIC)
+              .put(FieldDescriptor.Type.SFIXED64, TableFieldSchema.Type.INT64)
+              .put(FieldDescriptor.Type.SINT64, TableFieldSchema.Type.INT64)
+              .put(FieldDescriptor.Type.DOUBLE, TableFieldSchema.Type.DOUBLE)
+              .put(FieldDescriptor.Type.FLOAT, TableFieldSchema.Type.DOUBLE)
+              .put(FieldDescriptor.Type.STRING, TableFieldSchema.Type.STRING)
+              .put(FieldDescriptor.Type.BOOL, TableFieldSchema.Type.BOOL)
+              .put(FieldDescriptor.Type.BYTES, TableFieldSchema.Type.BYTES)
+              .build();
+
   public static Descriptor getDescriptorFromTableSchema(
       com.google.api.services.bigquery.model.TableSchema jsonSchema,
       boolean respectRequired,
@@ -428,8 +447,12 @@ public class TableRowToStorageApiProto {
   public static Descriptor getDescriptorFromTableSchema(
       TableSchema tableSchema, boolean respectRequired, boolean 
includeCdcColumns)
       throws DescriptorValidationException {
-    DescriptorProto descriptorProto =
-        descriptorSchemaFromTableSchema(tableSchema, respectRequired, 
includeCdcColumns);
+    return wrapDescriptorProto(
+        descriptorSchemaFromTableSchema(tableSchema, respectRequired, 
includeCdcColumns));
+  }
+
+  public static Descriptor wrapDescriptorProto(DescriptorProto descriptorProto)
+      throws DescriptorValidationException {
     FileDescriptorProto fileDescriptorProto =
         
FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
     FileDescriptor fileDescriptor =
@@ -633,6 +656,44 @@ public class TableRowToStorageApiProto {
     }
   }
 
+  static TableSchema tableSchemaFromDescriptor(Descriptor descriptor) {
+    List<TableFieldSchema> tableFields =
+        descriptor.getFields().stream()
+            .map(f -> tableFieldSchemaFromDescriptorField(f))
+            .collect(toList());
+    return TableSchema.newBuilder().addAllFields(tableFields).build();
+  }
+
+  static TableFieldSchema tableFieldSchemaFromDescriptorField(FieldDescriptor 
fieldDescriptor) {
+    TableFieldSchema.Builder tableFieldSchemaBuilder = 
TableFieldSchema.newBuilder();
+    tableFieldSchemaBuilder = 
tableFieldSchemaBuilder.setName(fieldDescriptor.getName());
+
+    switch (fieldDescriptor.getType()) {
+      case MESSAGE:
+        tableFieldSchemaBuilder = 
tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
+        TableSchema nestedTableField = 
tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
+        tableFieldSchemaBuilder =
+            
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
+        break;
+      default:
+        TableFieldSchema.Type type = 
PRIMITIVE_TYPES_PROTO_TO_BQ.get(fieldDescriptor.getType());
+        if (type == null) {
+          throw new UnsupportedOperationException(
+              "proto type " + fieldDescriptor.getType() + " is unsupported.");
+        }
+        tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(type);
+    }
+
+    if (fieldDescriptor.isRepeated()) {
+      tableFieldSchemaBuilder = 
tableFieldSchemaBuilder.setMode(TableFieldSchema.Mode.REPEATED);
+    } else if (fieldDescriptor.isRequired()) {
+      tableFieldSchemaBuilder = 
tableFieldSchemaBuilder.setMode(TableFieldSchema.Mode.REQUIRED);
+    } else {
+      tableFieldSchemaBuilder = 
tableFieldSchemaBuilder.setMode(TableFieldSchema.Mode.NULLABLE);
+    }
+    return tableFieldSchemaBuilder.build();
+  }
+
   @VisibleForTesting
   static DescriptorProto descriptorSchemaFromTableSchema(
       com.google.api.services.bigquery.model.TableSchema tableSchema,
@@ -700,7 +761,7 @@ public class TableRowToStorageApiProto {
             
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
         break;
       default:
-        @Nullable Type type = PRIMITIVE_TYPES.get(fieldSchema.getType());
+        @Nullable Type type = 
PRIMITIVE_TYPES_BQ_TO_PROTO.get(fieldSchema.getType());
         if (type == null) {
           throw new UnsupportedOperationException(
               "Converting BigQuery type " + fieldSchema.getType() + " to Beam 
type is unsupported");
@@ -710,9 +771,9 @@ public class TableRowToStorageApiProto {
 
     if (fieldSchema.getMode() == TableFieldSchema.Mode.REPEATED) {
       fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
-    } else if (!respectRequired || fieldSchema.getMode() == 
TableFieldSchema.Mode.NULLABLE) {
+    } else if (!respectRequired || fieldSchema.getMode() != 
TableFieldSchema.Mode.REQUIRED) {
       fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
-    } else if (fieldSchema.getMode() == TableFieldSchema.Mode.REQUIRED) {
+    } else {
       fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
     }
     descriptorBuilder.addField(fieldDescriptorBuilder.build());
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 64d828899ea..347a3513d89 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -43,6 +43,7 @@ import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
 import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.DynamicMessage;
@@ -599,7 +600,8 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
 
   @Override
   public StreamAppendClient getStreamAppendClient(
-      String streamName, Descriptor descriptor, boolean useConnectionPool) {
+      String streamName, DescriptorProtos.DescriptorProto descriptor, boolean 
useConnectionPool)
+      throws Exception {
     return new StreamAppendClient() {
       private Descriptor protoDescriptor;
       private TableSchema currentSchema;
@@ -609,7 +611,8 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
       private boolean usedForUpdate = false;
 
       {
-        this.protoDescriptor = descriptor;
+        this.protoDescriptor = 
TableRowToStorageApiProto.wrapDescriptorProto(descriptor);
+
         synchronized (FakeDatasetService.class) {
           Stream stream = writeStreams.get(streamName);
           if (stream == null) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index a0e6dafdb38..10abb738f95 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -48,11 +48,13 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.auto.value.AutoValue;
+import com.google.protobuf.ByteString;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -88,6 +90,7 @@ import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -133,6 +136,7 @@ import 
org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -150,6 +154,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterab
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
@@ -1105,7 +1110,7 @@ public class BigQueryIOWriteTest implements Serializable {
             new TableRow()
                 .set("strval", "test2")
                 .set("longval", "2")
-                .set("doubleval", 2.0D)
+                .set("doubleval", 2.0)
                 .set(
                     "instantval",
                     useStorageApi || useStorageApiApproximate
@@ -1182,12 +1187,12 @@ public class BigQueryIOWriteTest implements 
Serializable {
             new TableRow()
                 .set("strVal", "test_custom")
                 .set("longVal", "1")
-                .set("doubleVal", 1.0D)
+                .set("doubleVal", 1.0)
                 .set("instantVal", "2019-01-01 00:00:00 UTC"),
             new TableRow()
                 .set("strVal", "test2_custom")
                 .set("longVal", "2")
-                .set("doubleVal", 2.0D)
+                .set("doubleVal", 2.0)
                 .set("instantVal", "2019-02-01 00:00:00 UTC")));
   }
 
@@ -3004,6 +3009,106 @@ public class BigQueryIOWriteTest implements 
Serializable {
         containsInAnyOrder(Iterables.toArray(rows, TableRow.class)));
   }
 
+  @Test
+  public void testWriteProtos() throws Exception {
+    BigQueryIO.Write.Method method =
+        useStreaming
+            ? (useStorageApi
+                ? (useStorageApiApproximate
+                    ? Method.STORAGE_API_AT_LEAST_ONCE
+                    : Method.STORAGE_WRITE_API)
+                : Method.STREAMING_INSERTS)
+            : useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS;
+    Function<Integer, Proto3SchemaMessages.Primitive> getPrimitive =
+        (Integer i) ->
+            Proto3SchemaMessages.Primitive.newBuilder()
+                .setPrimitiveDouble(i)
+                .setPrimitiveFloat(i)
+                .setPrimitiveInt32(i)
+                .setPrimitiveInt64(i)
+                .setPrimitiveUint32(i)
+                .setPrimitiveUint64(i)
+                .setPrimitiveSint32(i)
+                .setPrimitiveSint64(i)
+                .setPrimitiveFixed32(i)
+                .setPrimitiveFixed64(i)
+                .setPrimitiveBool(true)
+                .setPrimitiveString(Integer.toString(i))
+                .setPrimitiveBytes(
+                    
ByteString.copyFrom(Integer.toString(i).getBytes(StandardCharsets.UTF_8)))
+                .build();
+    Function<Integer, TableRow> getPrimitiveRow =
+        (Integer i) ->
+            new TableRow()
+                .set("primitive_double", Double.valueOf(i))
+                .set("primitive_float", Float.valueOf(i).doubleValue())
+                .set("primitive_int32", i.intValue())
+                .set("primitive_int64", i.toString())
+                .set("primitive_uint32", i.toString())
+                .set("primitive_uint64", i.toString())
+                .set("primitive_sint32", i.toString())
+                .set("primitive_sint64", i.toString())
+                .set("primitive_fixed32", i.toString())
+                .set("primitive_fixed64", i.toString())
+                .set("primitive_bool", true)
+                .set("primitive_string", i.toString())
+                .set(
+                    "primitive_bytes",
+                    BaseEncoding.base64()
+                        .encode(
+                            
ByteString.copyFrom(i.toString().getBytes(StandardCharsets.UTF_8))
+                                .toByteArray()));
+
+    List<Proto3SchemaMessages.Primitive> nestedItems =
+        Lists.newArrayList(getPrimitive.apply(1), getPrimitive.apply(2), 
getPrimitive.apply(3));
+
+    Iterable<Proto3SchemaMessages.Nested> items =
+        nestedItems.stream()
+            .map(
+                p ->
+                    Proto3SchemaMessages.Nested.newBuilder()
+                        .setNested(p)
+                        .addAllNestedList(Lists.newArrayList(p, p, p))
+                        .build())
+            .collect(Collectors.toList());
+
+    List<TableRow> expectedNestedTableRows =
+        Lists.newArrayList(
+            getPrimitiveRow.apply(1), getPrimitiveRow.apply(2), 
getPrimitiveRow.apply(3));
+    Iterable<TableRow> expectedItems =
+        expectedNestedTableRows.stream()
+            .map(
+                p ->
+                    new TableRow().set("nested", p).set("nested_list", 
Lists.newArrayList(p, p, p)))
+            .collect(Collectors.toList());
+
+    BigQueryIO.Write<Proto3SchemaMessages.Nested> write =
+        BigQueryIO.writeProtos(Proto3SchemaMessages.Nested.class)
+            .to("dataset-id.table-id")
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withMethod(method)
+            .withoutValidation()
+            .withTestServices(fakeBqServices);
+
+    p.apply(Create.of(items)).apply("WriteToBQ", write);
+    p.run();
+
+    // Round trip through the coder to make sure the types match our expected 
types.
+    List<TableRow> allRows =
+        fakeDatasetService.getAllRows("project-id", "dataset-id", 
"table-id").stream()
+            .map(
+                tr -> {
+                  try {
+                    byte[] bytes = 
CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), tr);
+                    return 
CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), bytes);
+                  } catch (Exception e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
+    assertThat(allRows, containsInAnyOrder(Iterables.toArray(expectedItems, 
TableRow.class)));
+  }
+
   @Test
   public void testUpsertAndDeleteTableRows() throws Exception {
     assumeTrue(useStorageApi);
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java
new file mode 100644
index 00000000000..93bc4162409
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** An example that exports nested BigQuery record to a file. */
+@RunWith(Parameterized.class)
+public class StorageApiDirectWriteProtosIT {
+  @Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return ImmutableList.of(
+        new Object[] {true, false, false},
+        new Object[] {false, true, false},
+        new Object[] {false, false, true},
+        new Object[] {true, false, true});
+  }
+
+  @Parameterized.Parameter(0)
+  public boolean useStreamingExactlyOnce;
+
+  @Parameterized.Parameter(1)
+  public boolean useAtLeastOnce;
+
+  @Parameterized.Parameter(2)
+  public boolean useBatch;
+
+  private static final BigqueryClient BQ_CLIENT =
+      new BigqueryClient("StorageApiDirectWriteProtosIT");
+  private static final String PROJECT =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  private static final String BIG_QUERY_DATASET_ID =
+      "storage_api_sink_direct_write_protos" + System.nanoTime();
+
+  private BigQueryIO.Write.Method getMethod() {
+    return useAtLeastOnce
+        ? BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE
+        : BigQueryIO.Write.Method.STORAGE_WRITE_API;
+  }
+
+  @BeforeClass
+  public static void setUpTestEnvironment() throws IOException, 
InterruptedException {
+    // Create one BQ dataset for all test cases.
+    BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID);
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
+  }
+
+  @Test
+  public void testDirectWriteProtos() throws Exception {
+    Function<Integer, Proto3SchemaMessages.Primitive> getPrimitiveProto =
+        (Integer i) ->
+            Proto3SchemaMessages.Primitive.newBuilder()
+                .setPrimitiveDouble(i)
+                .setPrimitiveFloat(i)
+                .setPrimitiveInt32(i)
+                .setPrimitiveInt64(i)
+                .setPrimitiveUint32(i)
+                .setPrimitiveUint64(i)
+                .setPrimitiveSint32(i)
+                .setPrimitiveSint64(i)
+                .setPrimitiveFixed32(i)
+                .setPrimitiveFixed64(i)
+                .setPrimitiveBool(true)
+                .setPrimitiveString(Integer.toString(i))
+                .setPrimitiveBytes(
+                    
ByteString.copyFrom(Integer.toString(i).getBytes(StandardCharsets.UTF_8)))
+                .build();
+    Function<Integer, TableRow> getPrimitiveRow =
+        (Integer i) ->
+            new TableRow()
+                .set("primitive_double", Double.valueOf(i))
+                .set("primitive_float", Float.valueOf(i).doubleValue())
+                .set("primitive_int32", i.toString())
+                .set("primitive_int64", i.toString())
+                .set("primitive_uint32", i.toString())
+                .set("primitive_uint64", i.toString())
+                .set("primitive_sint32", i.toString())
+                .set("primitive_sint64", i.toString())
+                .set("primitive_fixed32", i.toString())
+                .set("primitive_fixed64", i.toString())
+                .set("primitive_bool", true)
+                .set("primitive_string", i.toString())
+                .set(
+                    "primitive_bytes",
+                    BaseEncoding.base64()
+                        .encode(
+                            
ByteString.copyFrom(i.toString().getBytes(StandardCharsets.UTF_8))
+                                .toByteArray()));
+
+    List<Proto3SchemaMessages.Primitive> nestedItems =
+        IntStream.range(1, 2)
+            .mapToObj(i -> getPrimitiveProto.apply(i))
+            .collect(Collectors.toList());
+
+    Iterable<Proto3SchemaMessages.Nested> items =
+        nestedItems.stream()
+            .map(
+                p ->
+                    Proto3SchemaMessages.Nested.newBuilder()
+                        .setNested(p)
+                        .addAllNestedList(Lists.newArrayList(p, p, p))
+                        .build())
+            .collect(Collectors.toList());
+
+    List<TableRow> expectedNestedItems =
+        IntStream.range(1, 
2).mapToObj(getPrimitiveRow::apply).collect(Collectors.toList());
+
+    Iterable<TableRow> expectedItems =
+        expectedNestedItems.stream()
+            .map(
+                p ->
+                    new TableRow()
+                        .set("nested_map", Lists.newArrayList())
+                        .set("nested", p)
+                        .set("nested_list", Lists.newArrayList(p, p, p)))
+            .collect(Collectors.toList());
+
+    String table = "table" + System.nanoTime();
+    String tableSpec = PROJECT + "." + BIG_QUERY_DATASET_ID + "." + table;
+
+    BigQueryIO.Write.Method method = getMethod();
+    BigQueryIO.Write<Proto3SchemaMessages.Nested> write =
+        BigQueryIO.writeProtos(Proto3SchemaMessages.Nested.class)
+            .to(tableSpec)
+            
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+            .withMethod(method);
+    if (method == BigQueryIO.Write.Method.STORAGE_WRITE_API) {
+      write = write.withNumStorageWriteApiStreams(1);
+      if (useStreamingExactlyOnce) {
+        write = write.withTriggeringFrequency(Duration.standardSeconds(1));
+      }
+    }
+
+    Pipeline p = Pipeline.create();
+
+    PCollection<Proto3SchemaMessages.Nested> input = p.apply("Create test 
cases", Create.of(items));
+    if (useStreamingExactlyOnce) {
+      input = input.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    }
+    input.apply("Write using Storage Write API", write);
+    p.run().waitUntilFinish();
+    assertRowsWritten(tableSpec, expectedItems);
+  }
+
+  void assertRowsWritten(String tableSpec, Iterable<TableRow> expectedItems) 
throws Exception {
+    List<TableRow> rows =
+        BQ_CLIENT.queryUnflattened(
+            String.format("SELECT * FROM %s", tableSpec), PROJECT, true, true);
+    assertThat(rows, containsInAnyOrder(Iterables.toArray(expectedItems, 
TableRow.class)));
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index dc5b4b82936..aae1c2096cc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -63,7 +63,7 @@ import org.junit.runners.JUnit4;
 /** Unit tests for {@link 
org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
 public class TableRowToStorageApiProtoTest {
   // Schemas we test.
-  // The TableRow class has special semantics for fields named "f". To ensure 
we handel them
+  // The TableRow class has special semantics for fields named "f". To ensure 
we handle them
   // properly, we test schemas
   // both with and without a field named "f".
   private static final TableSchema BASE_TABLE_SCHEMA =
@@ -167,7 +167,7 @@ public class TableRowToStorageApiProtoTest {
                   .add(new 
TableFieldSchema().setType("TIMESTAMP").setName("timestampValueMaximum"))
                   .build());
 
-  private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+  private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR =
       DescriptorProto.newBuilder()
           .addField(
               FieldDescriptorProto.newBuilder()
@@ -367,6 +367,150 @@ public class TableRowToStorageApiProtoTest {
                   .build())
           .build();
 
+  private static final com.google.cloud.bigquery.storage.v1.TableSchema 
BASE_TABLE_PROTO_SCHEMA =
+      com.google.cloud.bigquery.storage.v1.TableSchema.newBuilder()
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("stringvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("f")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("bytesvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("int64value")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("intvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("float64value")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DOUBLE)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("floatvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DOUBLE)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("boolvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BOOL)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("booleanvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BOOL)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestampvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timevalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("datetimevalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("datevalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("numericvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("bignumericvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("numericvalue2")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("bignumericvalue2")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("arrayvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestampisovalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestampisovalueoffsethh")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestampvaluelong")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestampvaluespace")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestampvaluespaceutc")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestampvaluezoneregion")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestampvaluespacemilli")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestampvaluespacetrailingzero")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("datetimevaluespace")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestampvaluemaximum")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                  .build())
+          .build();
+
   private static final DescriptorProto BASE_TABLE_SCHEMA_NO_F_PROTO =
       DescriptorProto.newBuilder()
           .addField(
@@ -559,6 +703,146 @@ public class TableRowToStorageApiProtoTest {
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .build();
+
+  private static final com.google.cloud.bigquery.storage.v1.TableSchema
+      BASE_TABLE_NO_F_PROTO_SCHEMA =
+          com.google.cloud.bigquery.storage.v1.TableSchema.newBuilder()
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("stringvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("bytesvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("int64value")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("intvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("float64value")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DOUBLE)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("floatvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DOUBLE)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("boolvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BOOL)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("booleanvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BOOL)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestampvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timevalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("datetimevalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("datevalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("numericvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("bignumericvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("numericvalue2")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("bignumericvalue2")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("arrayvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestampisovalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestampisovalueoffsethh")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestampvaluelong")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestampvaluespace")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestampvaluespaceutc")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestampvaluezoneregion")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestampvaluespacemilli")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestampvaluespacetrailingzero")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("datetimevaluespace")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestampvaluemaximum")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+                      .build())
+              .build();
   private static final TableSchema NESTED_TABLE_SCHEMA =
       new TableSchema()
           .setFields(
@@ -566,29 +850,33 @@ public class TableRowToStorageApiProtoTest {
                   .add(
                       new TableFieldSchema()
                           .setType("STRUCT")
-                          .setName("nestedValue1")
+                          .setName("nestedvalue1")
+                          .setMode("NULLABLE")
                           .setFields(BASE_TABLE_SCHEMA.getFields()))
                   .add(
                       new TableFieldSchema()
                           .setType("RECORD")
-                          .setName("nestedValue2")
+                          .setName("nestedvalue2")
+                          .setMode("NULLABLE")
                           .setFields(BASE_TABLE_SCHEMA.getFields()))
                   .add(
                       new TableFieldSchema()
                           .setType("STRUCT")
-                          .setName("nestedValueNoF1")
+                          .setName("nestedvaluenof1")
+                          .setMode("NULLABLE")
                           .setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
                   .add(
                       new TableFieldSchema()
                           .setType("RECORD")
-                          .setName("nestedValueNoF2")
+                          .setName("nestedvaluenof2")
+                          .setMode("NULLABLE")
                           .setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
                   .build());
 
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  public void testDescriptorFromTableSchema() {
+  public void testDescriptorFromTableSchema() throws Exception {
     DescriptorProto descriptor =
         
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA, 
true, false);
     Map<String, Type> types =
@@ -596,18 +884,37 @@ public class TableRowToStorageApiProtoTest {
             .collect(
                 Collectors.toMap(FieldDescriptorProto::getName, 
FieldDescriptorProto::getType));
     Map<String, Type> expectedTypes =
-        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+        BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR.getFieldList().stream()
             .collect(
                 Collectors.toMap(FieldDescriptorProto::getName, 
FieldDescriptorProto::getType));
     assertEquals(expectedTypes, types);
+
+    com.google.cloud.bigquery.storage.v1.TableSchema roundtripSchema =
+        TableRowToStorageApiProto.tableSchemaFromDescriptor(
+            TableRowToStorageApiProto.wrapDescriptorProto(descriptor));
+    Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type> 
roundTripTypes =
+        roundtripSchema.getFieldsList().stream()
+            .collect(
+                Collectors.toMap(
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+    Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type> 
roundTripExpectedTypes =
+        BASE_TABLE_PROTO_SCHEMA.getFieldsList().stream()
+            .collect(
+                Collectors.toMap(
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+    assertEquals(roundTripExpectedTypes, roundTripTypes);
   }
 
   @Test
-  public void testNestedFromTableSchema() {
+  public void testNestedFromTableSchema() throws Exception {
     DescriptorProto descriptor =
         
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA, 
true, false);
     Map<String, Type> expectedBaseTypes =
-        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+        BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR.getFieldList().stream()
             .collect(
                 Collectors.toMap(FieldDescriptorProto::getName, 
FieldDescriptorProto::getType));
     Map<String, Type> expectedBaseTypesNoF =
@@ -659,6 +966,97 @@ public class TableRowToStorageApiProtoTest {
             .collect(
                 Collectors.toMap(FieldDescriptorProto::getName, 
FieldDescriptorProto::getType));
     assertEquals(expectedBaseTypesNoF, nestedTypesNoF2);
+
+    Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
+        roundTripExpectedBaseTypes =
+            BASE_TABLE_PROTO_SCHEMA.getFieldsList().stream()
+                .collect(
+                    Collectors.toMap(
+                        
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                        
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+    Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
+        roundTripExpectedBaseTypesNoF =
+            BASE_TABLE_NO_F_PROTO_SCHEMA.getFieldsList().stream()
+                .collect(
+                    Collectors.toMap(
+                        
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                        
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+    com.google.cloud.bigquery.storage.v1.TableSchema roundtripSchema =
+        TableRowToStorageApiProto.tableSchemaFromDescriptor(
+            TableRowToStorageApiProto.wrapDescriptorProto(descriptor));
+
+    Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type> 
roundTripTypes =
+        roundtripSchema.getFieldsList().stream()
+            .collect(
+                Collectors.toMap(
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+    assertEquals(4, roundTripTypes.size());
+
+    assertEquals(
+        com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT,
+        roundTripTypes.get("nestedvalue1"));
+    com.google.cloud.bigquery.storage.v1.TableFieldSchema nestedType =
+        roundtripSchema.getFieldsList().stream()
+            .filter(f -> f.getName().equals("nestedvalue1"))
+            .findFirst()
+            .get();
+    Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type> 
nestedRoundTripTypes =
+        nestedType.getFieldsList().stream()
+            .collect(
+                Collectors.toMap(
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+    assertEquals(roundTripExpectedBaseTypes, nestedRoundTripTypes);
+
+    assertEquals(
+        com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT,
+        roundTripTypes.get("nestedvalue2"));
+    nestedType =
+        roundtripSchema.getFieldsList().stream()
+            .filter(f -> f.getName().equals("nestedvalue2"))
+            .findFirst()
+            .get();
+    nestedRoundTripTypes =
+        nestedType.getFieldsList().stream()
+            .collect(
+                Collectors.toMap(
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+    assertEquals(roundTripExpectedBaseTypes, nestedRoundTripTypes);
+
+    assertEquals(
+        com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT,
+        roundTripTypes.get("nestedvaluenof1"));
+    nestedType =
+        roundtripSchema.getFieldsList().stream()
+            .filter(f -> f.getName().equals("nestedvaluenof1"))
+            .findFirst()
+            .get();
+    nestedRoundTripTypes =
+        nestedType.getFieldsList().stream()
+            .collect(
+                Collectors.toMap(
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+    assertEquals(roundTripExpectedBaseTypesNoF, nestedRoundTripTypes);
+
+    assertEquals(
+        com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT,
+        roundTripTypes.get("nestedvaluenof2"));
+    nestedType =
+        roundtripSchema.getFieldsList().stream()
+            .filter(f -> f.getName().equals("nestedvaluenof2"))
+            .findFirst()
+            .get();
+    nestedRoundTripTypes =
+        nestedType.getFieldsList().stream()
+            .collect(
+                Collectors.toMap(
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+    assertEquals(roundTripExpectedBaseTypesNoF, nestedRoundTripTypes);
   }
 
   private static final List<Object> REPEATED_BYTES =


Reply via email to