ahmedabu98 commented on code in PR #27866:
URL: https://github.com/apache/beam/pull/27866#discussion_r1288726563
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -2170,6 +2174,27 @@ public static Write<GenericRecord> writeGenericRecords()
{
.withAvroFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER);
}
+ /**
+ * A {@link PTransform} that writes a {@link PCollection} containing
protocol buffer objects to a
+ * BigQuery table. If using one of the storage-api write methods, these
protocol buffers must
+ * match the schema of the table.
+ *
+ * <p>If a Schema is provided using {@link Write#withSchema}, that schema
will be used for
+ * creating the table if necessary. If no schema is provided, one will be
inferred from the
+ * protocol buffer's descriptor. Note that inferring a schema from the
protocol buffer may not
+ * always provide the intended schema as multiple BigQuery types can map to
the same protocol
+ * buffer type. For example, a protocol buffer field of type INT64 may be an
INT64 BigQuery type,
+ * but it might also represent a TIME, DATETIME, or a TIMESTAMP type.
+ */
+ public static <T extends Message> Write<T> writeProtos(Class<T>
protoMessageClass) {
Review Comment:
There's a check below that `writeProtoClass.getMethod("getDescriptor")`
exists. We can probably make this check here so that it's caught earlier during
pipeline construction.
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java:
##########
@@ -559,55 +703,218 @@ public class TableRowToStorageApiProtoTest {
.setLabel(Label.LABEL_OPTIONAL)
.build())
.build();
+
+ private static final com.google.cloud.bigquery.storage.v1.TableSchema
+ BASE_TABLE_NO_F_PROTO_SCHEMA =
Review Comment:
Why do we have two schemas that differ on just this one field (the field "f"
of type String)? Why not test on `BASE_TABLE_PROTO_SCHEMA` since it includes
everything?
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java:
##########
@@ -566,48 +892,71 @@ public class TableRowToStorageApiProtoTest {
.add(
new TableFieldSchema()
.setType("STRUCT")
- .setName("nestedValue1")
+ .setName("nestedvalue1")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA.getFields()))
.add(
new TableFieldSchema()
.setType("RECORD")
- .setName("nestedValue2")
+ .setName("nestedvalue2")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA.getFields()))
.add(
new TableFieldSchema()
.setType("STRUCT")
- .setName("nestedValueNoF1")
+ .setName("nestedvaluenof1")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
.add(
new TableFieldSchema()
.setType("RECORD")
- .setName("nestedValueNoF2")
+ .setName("nestedvaluenof2")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
.build());
@Rule public transient ExpectedException thrown = ExpectedException.none();
@Test
- public void testDescriptorFromTableSchema() {
+ public void testDescriptorFromTableSchema() throws Exception {
DescriptorProto descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA,
true, false);
Map<String, Type> types =
descriptor.getFieldList().stream()
.collect(
Collectors.toMap(FieldDescriptorProto::getName,
FieldDescriptorProto::getType));
Map<String, Type> expectedTypes =
- BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+ BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR.getFieldList().stream()
.collect(
Collectors.toMap(FieldDescriptorProto::getName,
FieldDescriptorProto::getType));
assertEquals(expectedTypes, types);
+
+ com.google.cloud.bigquery.storage.v1.TableSchema roundtripSchema =
+ TableRowToStorageApiProto.tableSchemaFromDescriptor(
+ TableRowToStorageApiProto.wrapDescriptorProto(descriptor));
+ Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
roundTripTypes =
+ roundtripSchema.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+ Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
roundTripExpectedTypes =
+ BASE_TABLE_PROTO_SCHEMA.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+ assertEquals(roundTripExpectedTypes, roundTripTypes);
Review Comment:
That would help make the test more complete, but it can also be a separate
effort. I don't think that's a blocker for this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]