ahmedabu98 commented on code in PR #27866:
URL: https://github.com/apache/beam/pull/27866#discussion_r1288726563


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -2170,6 +2174,27 @@ public static Write<GenericRecord> writeGenericRecords() 
{
         .withAvroFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER);
   }
 
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} containing 
protocol buffer objects to a
+   * BigQuery table. If using one of the storage-api write methods, these 
protocol buffers must
+   * match the schema of the table.
+   *
+   * <p>If a Schema is provided using {@link Write#withSchema}, that schema 
will be used for
+   * creating the table if necessary. If no schema is provided, one will be 
inferred from the
+   * protocol buffer's descriptor. Note that inferring a schema from the 
protocol buffer may not
+   * always provide the intended schema as multiple BigQuery types can map to 
the same protocol
+   * buffer type. For example, a protocol buffer field of type INT64 may be an 
INT64 BigQuery type,
+   * but it might also represent a TIME, DATETIME, or a TIMESTAMP type.
+   */
+  public static <T extends Message> Write<T> writeProtos(Class<T> 
protoMessageClass) {

Review Comment:
   There's a check below that `writeProtoClass.getMethod("getDescriptor")` 
exists. We can probably make this check here so that it's caught earlier during 
pipeline construction.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java:
##########
@@ -559,55 +703,218 @@ public class TableRowToStorageApiProtoTest {
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .build();
+
+  private static final com.google.cloud.bigquery.storage.v1.TableSchema
+      BASE_TABLE_NO_F_PROTO_SCHEMA =

Review Comment:
   Why do we have two schemas that differ on just this one field (the field "f" 
of type String)? Why not test on `BASE_TABLE_PROTO_SCHEMA` since it includes 
everything?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java:
##########
@@ -566,48 +892,71 @@ public class TableRowToStorageApiProtoTest {
                   .add(
                       new TableFieldSchema()
                           .setType("STRUCT")
-                          .setName("nestedValue1")
+                          .setName("nestedvalue1")
+                          .setMode("NULLABLE")
                           .setFields(BASE_TABLE_SCHEMA.getFields()))
                   .add(
                       new TableFieldSchema()
                           .setType("RECORD")
-                          .setName("nestedValue2")
+                          .setName("nestedvalue2")
+                          .setMode("NULLABLE")
                           .setFields(BASE_TABLE_SCHEMA.getFields()))
                   .add(
                       new TableFieldSchema()
                           .setType("STRUCT")
-                          .setName("nestedValueNoF1")
+                          .setName("nestedvaluenof1")
+                          .setMode("NULLABLE")
                           .setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
                   .add(
                       new TableFieldSchema()
                           .setType("RECORD")
-                          .setName("nestedValueNoF2")
+                          .setName("nestedvaluenof2")
+                          .setMode("NULLABLE")
                           .setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
                   .build());
 
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  public void testDescriptorFromTableSchema() {
+  public void testDescriptorFromTableSchema() throws Exception {
     DescriptorProto descriptor =
         
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA, 
true, false);
     Map<String, Type> types =
         descriptor.getFieldList().stream()
             .collect(
                 Collectors.toMap(FieldDescriptorProto::getName, 
FieldDescriptorProto::getType));
     Map<String, Type> expectedTypes =
-        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+        BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR.getFieldList().stream()
             .collect(
                 Collectors.toMap(FieldDescriptorProto::getName, 
FieldDescriptorProto::getType));
     assertEquals(expectedTypes, types);
+
+    com.google.cloud.bigquery.storage.v1.TableSchema roundtripSchema =
+        TableRowToStorageApiProto.tableSchemaFromDescriptor(
+            TableRowToStorageApiProto.wrapDescriptorProto(descriptor));
+    Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type> 
roundTripTypes =
+        roundtripSchema.getFieldsList().stream()
+            .collect(
+                Collectors.toMap(
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+    Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type> 
roundTripExpectedTypes =
+        BASE_TABLE_PROTO_SCHEMA.getFieldsList().stream()
+            .collect(
+                Collectors.toMap(
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+                    
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+    assertEquals(roundTripExpectedTypes, roundTripTypes);

Review Comment:
   That would help make the test more complete, but it can also be a separate 
effort. I don't think that's a blocker for this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to