ahmedabu98 commented on code in PR #27866:
URL: https://github.com/apache/beam/pull/27866#discussion_r1286048822
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -2170,6 +2174,27 @@ public static Write<GenericRecord> writeGenericRecords()
{
.withAvroFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER);
}
+ /**
+ * A {@link PTransform} that writes a {@link PCollection} containing
protocol buffer objects to a
+ * BigQuery table. If using one of the storage-api write methods, these
protocol buffers must
+ * match the schema of the table.
+ *
+ * <p>If a Schema is provided using {@link Write#withSchema}, that schema
will be used for
+ * creating the table if necessary. If no schema is provided, one will be
inferred from the
+ * protocol buffer's descriptor. Note that inferring a schema from the
protocol buffer may not
+ * always provide the intended schema as multiple BigQuery types can map to
the same protocol
+ * buffer type. For example, a protocol buffer field of type INT64 may be an
INT64 BigQuery type,
+ * but it might also represent a TIME, DATETIME, or a TIMESTAMP type.
+ */
+ public static <T extends Message> Write<T> writeProtos(Class<T>
protoMessageClass) {
Review Comment:
Should we make a check that the `protoMessageClass` has a getDescriptor
method? Or maybe narrow the class down by having it extend
`com.google.protobuf.Message`?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/** Storage API DynamicDestinations used when the input is a Beam Row. */
+class StorageApiDynamicDestinationsProto<T extends Message, DestinationT
extends @NonNull Object>
+ extends StorageApiDynamicDestinations<T, DestinationT> {
+ private final Class<T> protoClass;
+ Supplier<Descriptors.Descriptor> getDescriptor;
+
+ @SuppressWarnings({"unchecked", "nullness"})
+ StorageApiDynamicDestinationsProto(
+ DynamicDestinations<T, DestinationT> inner, Class<T> protoClass) {
+ super(inner);
+ this.protoClass = protoClass;
+ this.getDescriptor =
+ Suppliers.memoize(
+ (Supplier<Descriptors.Descriptor> & Serializable)
+ () -> {
+ try {
+ return (Descriptors.Descriptor)
+
Preconditions.checkStateNotNull(this.protoClass.getMethod("getDescriptor"))
+ .invoke(null);
+ } catch (IllegalAccessException
+ | InvocationTargetException
+ | NoSuchMethodException e) {
+ throw new IllegalArgumentException(e);
+ }
+ });
+ }
+
+ @Override
+ public MessageConverter<T> getMessageConverter(
+ DestinationT destination, DatasetService datasetService) throws
Exception {
+ return new Converter(
+ TableRowToStorageApiProto.schemaToProtoTableSchema(
+ Preconditions.checkStateNotNull(getSchema(destination))));
+ }
+
+ class Converter implements MessageConverter<T> {
+ TableSchema tableSchema;
+
+ Converter(TableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public Descriptors.Descriptor getDescriptor(boolean includeCdcColumns)
throws Exception {
+
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(
+ !includeCdcColumns);
Review Comment:
Would be helpful add a comment that row mutations are not supported yet
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -633,6 +658,63 @@ public static DynamicMessage messageFromTableRow(
}
}
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+ })
Review Comment:
FYI there's been an effort to remove these suppressions from our codebase
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -412,6 +413,26 @@ static SchemaInformation fromTableSchema(
.put(TableFieldSchema.Type.JSON, Type.TYPE_STRING)
.build();
+ static final Map<Descriptors.FieldDescriptor.Type, TableFieldSchema.Type>
+ PRIMITIVE_TYPES_PROTO_TO_BQ =
+ ImmutableMap.<Descriptors.FieldDescriptor.Type,
TableFieldSchema.Type>builder()
+ .put(Descriptors.FieldDescriptor.Type.INT32,
TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.FIXED32, TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.UINT32, TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.SFIXED32, TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.SINT32, TableFieldSchema.Type.INT64)
+ .put(Descriptors.FieldDescriptor.Type.INT64,
TableFieldSchema.Type.INT64)
Review Comment:
nit
```suggestion
.put(FieldDescriptor.Type.INT64, TableFieldSchema.Type.INT64)
```
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java:
##########
@@ -566,48 +892,71 @@ public class TableRowToStorageApiProtoTest {
.add(
new TableFieldSchema()
.setType("STRUCT")
- .setName("nestedValue1")
+ .setName("nestedvalue1")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA.getFields()))
.add(
new TableFieldSchema()
.setType("RECORD")
- .setName("nestedValue2")
+ .setName("nestedvalue2")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA.getFields()))
.add(
new TableFieldSchema()
.setType("STRUCT")
- .setName("nestedValueNoF1")
+ .setName("nestedvaluenof1")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
.add(
new TableFieldSchema()
.setType("RECORD")
- .setName("nestedValueNoF2")
+ .setName("nestedvaluenof2")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
.build());
@Rule public transient ExpectedException thrown = ExpectedException.none();
@Test
- public void testDescriptorFromTableSchema() {
+ public void testDescriptorFromTableSchema() throws Exception {
DescriptorProto descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA,
true, false);
Map<String, Type> types =
descriptor.getFieldList().stream()
.collect(
Collectors.toMap(FieldDescriptorProto::getName,
FieldDescriptorProto::getType));
Map<String, Type> expectedTypes =
- BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+ BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR.getFieldList().stream()
.collect(
Collectors.toMap(FieldDescriptorProto::getName,
FieldDescriptorProto::getType));
assertEquals(expectedTypes, types);
+
+ com.google.cloud.bigquery.storage.v1.TableSchema roundtripSchema =
+ TableRowToStorageApiProto.tableSchemaFromDescriptor(
+ TableRowToStorageApiProto.wrapDescriptorProto(descriptor));
+ Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
roundTripTypes =
+ roundtripSchema.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+ Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
roundTripExpectedTypes =
+ BASE_TABLE_PROTO_SCHEMA.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+ assertEquals(roundTripExpectedTypes, roundTripTypes);
Review Comment:
This test doesn't check that field mode survives a roundtrip
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -359,12 +362,22 @@ String getOrCreateStreamName() throws Exception {
}
AppendClientInfo generateClient(@Nullable TableSchema updatedSchema)
throws Exception {
- TableSchema tableSchema =
- (updatedSchema != null) ? updatedSchema :
getCurrentTableSchema(streamName);
Review Comment:
Do we lose anything by not calling `getCurrentTableSchema` anymore? looks
like it has some logic that fetches write streams to potentially get an updated
schema.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3252,11 +3286,34 @@ private <DestinationT> WriteResult expandTyped(
formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction());
}
// Infer the TableSchema from the input Beam schema.
+ // TODO: If the user provided a schema, we should use that. There are
things that can be
+ // specified in a
+ // BQ schema that don't have exact matches in a Beam schema (e.g.
GEOGRAPHY types).
TableSchema tableSchema =
BigQueryUtils.toTableSchema(input.getSchema());
dynamicDestinations =
new ConstantSchemaDestinations<>(
dynamicDestinations,
StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema)));
+ } else if (writeProtoClass != null) {
+ if (!hasSchema) {
Review Comment:
Can combine these into one expression (writeProtoClass != null && !hasSchema)
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -633,6 +658,63 @@ public static DynamicMessage messageFromTableRow(
}
}
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+ })
+ static TableSchema tableSchemaFromDescriptor(Descriptor descriptor) {
Review Comment:
`protoTableSchemaFromDescriptor`/ `protoSchemaFromDescriptor`?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/** Storage API DynamicDestinations used when the input is a Beam Row. */
+class StorageApiDynamicDestinationsProto<T extends Message, DestinationT
extends @NonNull Object>
+ extends StorageApiDynamicDestinations<T, DestinationT> {
+ private final Class<T> protoClass;
+ Supplier<Descriptors.Descriptor> getDescriptor;
+
+ @SuppressWarnings({"unchecked", "nullness"})
+ StorageApiDynamicDestinationsProto(
+ DynamicDestinations<T, DestinationT> inner, Class<T> protoClass) {
+ super(inner);
+ this.protoClass = protoClass;
+ this.getDescriptor =
+ Suppliers.memoize(
+ (Supplier<Descriptors.Descriptor> & Serializable)
+ () -> {
+ try {
+ return (Descriptors.Descriptor)
+
Preconditions.checkStateNotNull(this.protoClass.getMethod("getDescriptor"))
+ .invoke(null);
+ } catch (IllegalAccessException
+ | InvocationTargetException
+ | NoSuchMethodException e) {
+ throw new IllegalArgumentException(e);
+ }
+ });
+ }
+
+ @Override
+ public MessageConverter<T> getMessageConverter(
+ DestinationT destination, DatasetService datasetService) throws
Exception {
+ return new Converter(
+ TableRowToStorageApiProto.schemaToProtoTableSchema(
+ Preconditions.checkStateNotNull(getSchema(destination))));
+ }
+
+ class Converter implements MessageConverter<T> {
+ TableSchema tableSchema;
+
+ Converter(TableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public Descriptors.Descriptor getDescriptor(boolean includeCdcColumns)
throws Exception {
+
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(
+ !includeCdcColumns);
+ return getDescriptor.get();
+ }
+
+ @Override
+ @SuppressWarnings("nullness")
+ public StorageApiWritePayload toMessage(
+ T element, @Nullable RowMutationInformation rowMutationInformation)
throws Exception {
+ // NB: What makes this path efficient is that the storage API directly
understands protos, so
Review Comment:
Remove suppression and assert that `rowMutationInformation` is null?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/** Storage API DynamicDestinations used when the input is a Beam Row. */
Review Comment:
```suggestion
/** Storage API DynamicDestinations used when the input is a protocol
buffer. */
```
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java:
##########
@@ -63,111 +63,293 @@
/** Unit tests for {@link
org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
public class TableRowToStorageApiProtoTest {
// Schemas we test.
- // The TableRow class has special semantics for fields named "f". To ensure
we handel them
+ // The TableRow class has special semantics for fields named "f". To ensure
we handle them
// properly, we test schemas
// both with and without a field named "f".
private static final TableSchema BASE_TABLE_SCHEMA =
new TableSchema()
.setFields(
ImmutableList.<TableFieldSchema>builder()
- .add(new
TableFieldSchema().setType("STRING").setName("stringValue"))
- .add(new TableFieldSchema().setType("STRING").setName("f"))
- .add(new
TableFieldSchema().setType("BYTES").setName("bytesValue"))
- .add(new
TableFieldSchema().setType("INT64").setName("int64Value"))
- .add(new
TableFieldSchema().setType("INTEGER").setName("intValue"))
- .add(new
TableFieldSchema().setType("FLOAT64").setName("float64Value"))
- .add(new
TableFieldSchema().setType("FLOAT").setName("floatValue"))
- .add(new
TableFieldSchema().setType("BOOL").setName("boolValue"))
- .add(new
TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
- .add(new
TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
- .add(new
TableFieldSchema().setType("TIME").setName("timeValue"))
- .add(new
TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
- .add(new
TableFieldSchema().setType("DATE").setName("dateValue"))
- .add(new
TableFieldSchema().setType("NUMERIC").setName("numericValue"))
- .add(new
TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue"))
- .add(new
TableFieldSchema().setType("NUMERIC").setName("numericValue2"))
- .add(new
TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue2"))
+ .add(
Review Comment:
Should we keep some of these without a mode, in order to test default
behavior?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -353,7 +353,7 @@ private static Object convertRequiredField(
case "BOOL":
case "BOOLEAN":
verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
- return v;
+ return v.toString();
Review Comment:
Any advantages to using the string representation for these values? just
curious for future reference
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -709,7 +694,7 @@ static TableFieldSchema tableFieldSchemaFromDescriptorField(
if (fieldDescriptor.isRequired()) {
tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setMode(TableFieldSchema.Mode.REQUIRED);
}
- if (fieldDescriptor.isOptional()) {
+ if (fieldDescriptor.isOptional() || !fieldDescriptor.isRequired()) {
tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setMode(TableFieldSchema.Mode.NULLABLE);
}
Review Comment:
Wouldn't this overwrite a REPEATED mode to NULLABLE? Maybe we don't need
this extra check, according to
[docs](https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1beta2.TableFieldSchema.Builder#com_google_cloud_bigquery_storage_v1beta2_TableFieldSchema_Builder_setMode_com_google_cloud_bigquery_storage_v1beta2_TableFieldSchema_Mode_),
the mode already defaults to nullable.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -709,7 +694,7 @@ static TableFieldSchema tableFieldSchemaFromDescriptorField(
if (fieldDescriptor.isRequired()) {
tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setMode(TableFieldSchema.Mode.REQUIRED);
}
- if (fieldDescriptor.isOptional()) {
+ if (fieldDescriptor.isOptional() || !fieldDescriptor.isRequired()) {
tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setMode(TableFieldSchema.Mode.NULLABLE);
}
Review Comment:
Alternatively, we could keep this check but turn these into else-if
conditions
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]