This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ba04f6098f7 Merge pull request #27866: Allow writing protos directly
to the storage API without conversion
ba04f6098f7 is described below
commit ba04f6098f7472a123c6cd7b8d4a991ce16d1958
Author: Reuven Lax <[email protected]>
AuthorDate: Thu Aug 10 09:15:58 2023 -0700
Merge pull request #27866: Allow writing protos directly to the storage API
without conversion
---
.../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 32 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 96 +++++
.../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 5 +-
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 8 +-
.../bigquery/StorageApiDynamicDestinations.java | 3 +
.../StorageApiDynamicDestinationsBeamRow.java | 6 +
...StorageApiDynamicDestinationsGenericRecord.java | 7 +
.../StorageApiDynamicDestinationsProto.java | 100 +++++
.../StorageApiDynamicDestinationsTableRow.java | 7 +
.../bigquery/StorageApiWriteUnshardedRecords.java | 66 +++-
.../bigquery/StorageApiWritesShardedRecords.java | 29 +-
.../io/gcp/bigquery/TableRowToStorageApiProto.java | 83 +++-
.../sdk/io/gcp/testing/FakeDatasetService.java | 7 +-
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 111 +++++-
.../bigquery/StorageApiDirectWriteProtosIT.java | 197 ++++++++++
.../bigquery/TableRowToStorageApiProtoTest.java | 418 ++++++++++++++++++++-
16 files changed, 1109 insertions(+), 66 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index b8a7fc760bc..46c25d47e7a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -22,9 +22,9 @@ import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.memoized.Memoized;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -49,7 +49,7 @@ abstract class AppendClientInfo {
abstract @Nullable String getStreamName();
- abstract Descriptors.Descriptor getDescriptor();
+ abstract DescriptorProtos.DescriptorProto getDescriptor();
@AutoValue.Builder
abstract static class Builder {
@@ -63,7 +63,7 @@ abstract class AppendClientInfo {
abstract Builder
setSchemaInformation(TableRowToStorageApiProto.SchemaInformation value);
- abstract Builder setDescriptor(Descriptors.Descriptor value);
+ abstract Builder setDescriptor(DescriptorProtos.DescriptorProto value);
abstract Builder setStreamName(@Nullable String name);
@@ -74,8 +74,8 @@ abstract class AppendClientInfo {
static AppendClientInfo of(
TableSchema tableSchema,
- Consumer<BigQueryServices.StreamAppendClient> closeAppendClient,
- boolean includeCdcColumns)
+ DescriptorProtos.DescriptorProto descriptor,
+ Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
throws Exception {
return new AutoValue_AppendClientInfo.Builder()
.setTableSchema(tableSchema)
@@ -83,12 +83,22 @@ abstract class AppendClientInfo {
.setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema))
.setSchemaInformation(
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema))
- .setDescriptor(
- TableRowToStorageApiProto.getDescriptorFromTableSchema(
- tableSchema, true, includeCdcColumns))
+ .setDescriptor(descriptor)
.build();
}
+ static AppendClientInfo of(
+ TableSchema tableSchema,
+ Consumer<BigQueryServices.StreamAppendClient> closeAppendClient,
+ boolean includeCdcColumns)
+ throws Exception {
+ return of(
+ tableSchema,
+ TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+ tableSchema, true, includeCdcColumns),
+ closeAppendClient);
+ }
+
public AppendClientInfo withNoAppendClient() {
return toBuilder().setStreamAppendClient(null).build();
}
@@ -149,8 +159,10 @@ abstract class AppendClientInfo {
public TableRow toTableRow(ByteString protoBytes) {
try {
return TableRowToStorageApiProto.tableRowFromMessage(
- DynamicMessage.parseFrom(getDescriptor(), protoBytes), true);
- } catch (InvalidProtocolBufferException e) {
+ DynamicMessage.parseFrom(
+ TableRowToStorageApiProto.wrapDescriptorProto(getDescriptor()),
protoBytes),
+ true);
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 57f77cbcd2f..fd445bcfc39 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -44,8 +44,12 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
import java.io.IOException;
import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -2131,6 +2135,7 @@ public class BigQueryIO {
.setDeterministicRecordIdFn(null)
.setMaxRetryJobs(1000)
.setPropagateSuccessfulStorageApiWrites(false)
+ .setDirectWriteProtos(true)
.build();
}
@@ -2170,6 +2175,27 @@ public class BigQueryIO {
.withAvroFormatFunction(GENERIC_RECORD_IDENTITY_FORMATTER);
}
+ /**
+ * A {@link PTransform} that writes a {@link PCollection} containing
protocol buffer objects to a
+ * BigQuery table. If using one of the storage-api write methods, these
protocol buffers must
+ * match the schema of the table.
+ *
+ * <p>If a Schema is provided using {@link Write#withSchema}, that schema
will be used for
+ * creating the table if necessary. If no schema is provided, one will be
inferred from the
+ * protocol buffer's descriptor. Note that inferring a schema from the
protocol buffer may not
+ * always provide the intended schema as multiple BigQuery types can map to
the same protocol
+ * buffer type. For example, a protocol buffer field of type INT64 may be an
INT64 BigQuery type,
+ * but it might also represent a TIME, DATETIME, or a TIMESTAMP type.
+ */
+ public static <T extends Message> Write<T> writeProtos(Class<T>
protoMessageClass) {
+ if (DynamicMessage.class.equals(protoMessageClass)) {
+ throw new IllegalArgumentException("DynamicMessage is not supported.");
+ }
+ return BigQueryIO.<T>write()
+ .withFormatFunction(m ->
TableRowToStorageApiProto.tableRowFromMessage(m, false))
+ .withWriteProtosClass(protoMessageClass);
+ }
+
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>,
WriteResult> {
@@ -2302,6 +2328,10 @@ public class BigQueryIO {
abstract Boolean getAutoSchemaUpdate();
+ abstract @Nullable Class<T> getWriteProtosClass();
+
+ abstract Boolean getDirectWriteProtos();
+
abstract @Nullable SerializableFunction<T, String>
getDeterministicRecordIdFn();
abstract @Nullable String getWriteTempDataset();
@@ -2400,6 +2430,10 @@ public class BigQueryIO {
abstract Builder<T> setAutoSchemaUpdate(Boolean autoSchemaUpdate);
+ abstract Builder<T> setWriteProtosClass(@Nullable Class<T> clazz);
+
+ abstract Builder<T> setDirectWriteProtos(Boolean direct);
+
abstract Builder<T> setDeterministicRecordIdFn(
SerializableFunction<T, String> toUniqueIdFunction);
@@ -2780,6 +2814,20 @@ public class BigQueryIO {
return toBuilder().setRowMutationInformationFn(updateFn).build();
}
+ Write<T> withWriteProtosClass(Class<T> clazz) {
+ return toBuilder().setWriteProtosClass(clazz).build();
+ }
+
+ /*
+ When using {@link Write.Method#STORAGE_API} or {@link
Write.Method#STORAGE_API_AT_LEAST_ONCE} along with
+ {@link BigQueryIO.writeProtos}, the sink will try to write the protos
directly to BigQuery without modification.
+ In some cases this is not supported or BigQuery cannot directly interpet
the proto. In these cases, the direct
+ proto write
+ */
+ public Write<T> withDirectWriteProtos(boolean directWriteProtos) {
+ return toBuilder().setDirectWriteProtos(directWriteProtos).build();
+ }
+
/**
* Set the project the BigQuery load job will be initiated from. This is
only applicable when
* the write method is set to {@link Method#FILE_LOADS}. If omitted, the
project of the
@@ -3281,6 +3329,7 @@ public class BigQueryIO {
|| getDynamicDestinations() != null
|| getSchemaFromView() != null;
+ Class<T> writeProtoClass = getWriteProtosClass();
if (getUseBeamSchema()) {
checkArgument(input.hasSchema(), "The input doesn't has a schema");
optimizeWrites = true;
@@ -3296,11 +3345,34 @@ public class BigQueryIO {
formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction());
}
// Infer the TableSchema from the input Beam schema.
+ // TODO: If the user provided a schema, we should use that. There are
things that can be
+ // specified in a
+ // BQ schema that don't have exact matches in a Beam schema (e.g.
GEOGRAPHY types).
TableSchema tableSchema =
BigQueryUtils.toTableSchema(input.getSchema());
dynamicDestinations =
new ConstantSchemaDestinations<>(
dynamicDestinations,
StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema)));
+ } else if (writeProtoClass != null) {
+ if (!hasSchema) {
+ try {
+ @SuppressWarnings({"unchecked", "nullness"})
+ Descriptors.Descriptor descriptor =
+ (Descriptors.Descriptor)
+ org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
+ writeProtoClass.getMethod("getDescriptor"))
+ .invoke(null);
+ TableSchema tableSchema =
+ TableRowToStorageApiProto.protoSchemaToTableSchema(
+
TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor));
+ dynamicDestinations =
+ new ConstantSchemaDestinations<>(
+ dynamicDestinations,
+
StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema)));
+ } catch (IllegalAccessException | InvocationTargetException |
NoSuchMethodException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
} else {
// Require a schema if creating one or more tables.
checkArgument(
@@ -3379,6 +3451,7 @@ public class BigQueryIO {
method);
}
+ @SuppressWarnings("rawtypes")
private <DestinationT> WriteResult continueExpandTyped(
PCollection<KV<DestinationT, T>> input,
Coder<T> elementCoder,
@@ -3501,6 +3574,28 @@ public class BigQueryIO {
elementSchema,
elementToRowFunction,
getRowMutationInformationFn() != null);
+ } else if (getWriteProtosClass() != null && getDirectWriteProtos()) {
+ // We could support both of these by falling back to
+ // StorageApiDynamicDestinationsTableRow. This
+ // would defeat the optimization (we would be forced to create a new
dynamic proto message
+ // and copy the data over). For now, we simply give the user a way
to disable the
+ // optimization themselves.
+ checkArgument(
+ getRowMutationInformationFn() == null,
+ "Row upserts and deletes are not for direct proto writes. "
+ + "Try setting withDirectWriteProtos(false)");
+ checkArgument(
+ !getAutoSchemaUpdate(),
+ "withAutoSchemaUpdate not supported when using writeProtos."
+ + " Try setting withDirectWriteProtos(false)");
+ checkArgument(
+ !getIgnoreUnknownValues(),
+ "ignoreUnknownValues not supported when using writeProtos."
+ + " Try setting withDirectWriteProtos(false)");
+ storageApiDynamicDestinations =
+ (StorageApiDynamicDestinations<T, DestinationT>)
+ new StorageApiDynamicDestinationsProto(
+ dynamicDestinations, getWriteProtosClass());
} else if (getAvroRowWriterFactory() != null) {
// we can configure the avro to storage write api proto converter
for this
// assuming the format function returns an Avro GenericRecord
@@ -3537,6 +3632,7 @@ public class BigQueryIO {
getIgnoreUnknownValues(),
getAutoSchemaUpdate());
}
+
int numShards = getStorageApiNumStreams(bqOptions);
boolean enableAutoSharding = getAutoSharding();
if (numShards == 0) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 6f178bd6150..1cc9049a542 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -43,7 +43,7 @@ import
com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
-import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.DescriptorProtos;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
@@ -213,7 +213,8 @@ public interface BigQueryServices extends Serializable {
* first.
*/
StreamAppendClient getStreamAppendClient(
- String streamName, Descriptor descriptor, boolean useConnectionPool)
throws Exception;
+ String streamName, DescriptorProtos.DescriptorProto descriptor,
boolean useConnectionPool)
+ throws Exception;
/** Flush a given stream up to the given offset. The stream must have type
BUFFERED. */
ApiFuture<FlushRowsResponse> flush(String streamName, long flushOffset)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index dd423294f6c..17b5c5ebd99 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -86,7 +86,7 @@ import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
@@ -1352,9 +1352,9 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public StreamAppendClient getStreamAppendClient(
- String streamName, Descriptor descriptor, boolean useConnectionPool)
throws Exception {
- ProtoSchema protoSchema =
-
ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build();
+ String streamName, DescriptorProtos.DescriptorProto descriptor,
boolean useConnectionPool)
+ throws Exception {
+ ProtoSchema protoSchema =
ProtoSchema.newBuilder().setProtoDescriptor(descriptor).build();
TransportChannelProvider transportChannelProvider =
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
index b3085b15e95..fdf330d378f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
@@ -33,6 +34,8 @@ abstract class StorageApiDynamicDestinations<T, DestinationT>
public interface MessageConverter<T> {
com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema();
+ DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns)
throws Exception;
+
StorageApiWritePayload toMessage(
T element, @Nullable RowMutationInformation rowMutationInformation)
throws Exception;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
index e0964135be3..7821a7d9199 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Message;
import javax.annotation.Nullable;
@@ -75,6 +76,11 @@ class StorageApiDynamicDestinationsBeamRow<T, DestinationT
extends @NonNull Obje
return tableSchema;
}
+ @Override
+ public DescriptorProtos.DescriptorProto getDescriptor(boolean
includeCdcColumns) {
+ return cdcDescriptor != null ? cdcDescriptor.toProto() :
descriptor.toProto();
+ }
+
@Override
@SuppressWarnings("nullness")
public StorageApiWritePayload toMessage(
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
index 53f3b137bf3..79141d73a39 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Message;
import org.apache.avro.Schema;
@@ -109,5 +110,11 @@ class StorageApiDynamicDestinationsGenericRecord<T,
DestinationT extends @NonNul
public com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema() {
return protoTableSchema;
}
+
+ @Override
+ public DescriptorProtos.DescriptorProto getDescriptor(boolean
includeCdcColumns)
+ throws Exception {
+ return cdcDescriptor != null ? cdcDescriptor.toProto() :
descriptor.toProto();
+ }
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java
new file mode 100644
index 00000000000..57dbdc9d1e7
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import java.lang.reflect.InvocationTargetException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.util.Preconditions;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/** Storage API DynamicDestinations used when the input is a compiled protocol
buffer. */
+class StorageApiDynamicDestinationsProto<T extends Message, DestinationT
extends @NonNull Object>
+ extends StorageApiDynamicDestinations<T, DestinationT> {
+ DescriptorProtos.DescriptorProto descriptorProto;
+
+ @SuppressWarnings({"unchecked", "nullness"})
+ StorageApiDynamicDestinationsProto(
+ DynamicDestinations<T, DestinationT> inner, Class<T> protoClass) {
+ super(inner);
+ try {
+ this.descriptorProto =
+ fixNestedTypes(
+ (Descriptors.Descriptor)
+
Preconditions.checkStateNotNull(protoClass.getMethod("getDescriptor"))
+ .invoke(null));
+ } catch (IllegalAccessException | InvocationTargetException |
NoSuchMethodException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public MessageConverter<T> getMessageConverter(
+ DestinationT destination, DatasetService datasetService) throws
Exception {
+ return new Converter(
+ TableRowToStorageApiProto.schemaToProtoTableSchema(
+ Preconditions.checkStateNotNull(getSchema(destination))));
+ }
+
+ class Converter implements MessageConverter<T> {
+ TableSchema tableSchema;
+
+ Converter(TableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public DescriptorProtos.DescriptorProto getDescriptor(boolean
includeCdcColumns)
+ throws Exception {
+
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(
+ !includeCdcColumns);
+ return descriptorProto;
+ }
+
+ @Override
+ public StorageApiWritePayload toMessage(
+ T element, @Nullable RowMutationInformation rowMutationInformation)
throws Exception {
+ // NB: What makes this path efficient is that the storage API directly
understands protos, so
+ // we can forward
+ // the through directly. This means that we don't currently support
ignoreUnknownValues or
+ // autoUpdateSchema.
+ return StorageApiWritePayload.of(element.toByteArray(), null);
+ }
+
+ @Override
+ public TableRow toTableRow(T element) {
+ throw new RuntimeException("Not implemented!");
+ }
+ };
+
+ private static DescriptorProtos.DescriptorProto fixNestedTypes(
+ Descriptors.Descriptor descriptor) {
+ return ProtoSchemaConverter.convert(descriptor).getProtoDescriptor();
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index 19569fe068a..eb93d7c398f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Message;
import java.util.concurrent.ExecutionException;
@@ -143,6 +144,12 @@ public class StorageApiDynamicDestinationsTableRow<T,
DestinationT extends @NonN
return protoTableSchema;
}
+ @Override
+ public DescriptorProtos.DescriptorProto getDescriptor(boolean
includeCdcColumns)
+ throws Exception {
+ return cdcDescriptor != null ? cdcDescriptor.toProto() :
descriptor.toProto();
+ }
+
@Override
public TableRow toTableRow(T element) {
return formatFunction.apply(element);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 5c6584fa32c..27a5b30c156 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -30,8 +30,8 @@ import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Status;
import java.io.IOException;
import java.time.Instant;
@@ -46,6 +46,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -278,6 +279,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
private final boolean useDefaultStream;
private TableSchema initialTableSchema;
+ private DescriptorProtos.DescriptorProto initialDescriptor;
private Instant nextCacheTickle = Instant.MAX;
private final int clientNumber;
private final boolean usingMultiplexing;
@@ -302,6 +304,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
this.maybeDatasetService = datasetService;
this.useDefaultStream = useDefaultStream;
this.initialTableSchema = messageConverter.getTableSchema();
+ this.initialDescriptor =
messageConverter.getDescriptor(includeCdcColumns);
this.clientNumber = new Random().nextInt(streamAppendClientCount);
this.usingMultiplexing = usingMultiplexing;
this.maxRequestSize = maxRequestSize;
@@ -359,12 +362,13 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
}
AppendClientInfo generateClient(@Nullable TableSchema updatedSchema)
throws Exception {
- TableSchema tableSchema =
- (updatedSchema != null) ? updatedSchema :
getCurrentTableSchema(streamName);
+ SchemaAndDescriptor schemaAndDescriptor =
getCurrentTableSchema(streamName, updatedSchema);
+
AtomicReference<AppendClientInfo> appendClientInfo =
new AtomicReference<>(
AppendClientInfo.of(
- tableSchema,
+ schemaAndDescriptor.tableSchema,
+ schemaAndDescriptor.descriptor,
// Make sure that the client is always closed in a
different thread to avoid
// blocking.
client ->
@@ -376,8 +380,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
client.unpin();
client.close();
}
- }),
- includeCdcColumns));
+ })));
CreateTableHelpers.createTableWrapper(
() -> {
@@ -398,8 +401,28 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
return appendClientInfo.get();
}
- TableSchema getCurrentTableSchema(String stream) throws Exception {
+ private class SchemaAndDescriptor {
+ private final TableSchema tableSchema;
+ private final DescriptorProtos.DescriptorProto descriptor;
+
+ private SchemaAndDescriptor(
+ TableSchema tableSchema, DescriptorProtos.DescriptorProto
descriptor) {
+ this.tableSchema = tableSchema;
+ this.descriptor = descriptor;
+ }
+ }
+
+ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable
TableSchema updatedSchema)
+ throws Exception {
+ if (updatedSchema != null) {
+ return new SchemaAndDescriptor(
+ updatedSchema,
+ TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+ updatedSchema, true, includeCdcColumns));
+ }
+
AtomicReference<TableSchema> currentSchema = new
AtomicReference<>(initialTableSchema);
+ AtomicBoolean updated = new AtomicBoolean();
CreateTableHelpers.createTableWrapper(
() -> {
if (autoUpdateSchema) {
@@ -408,12 +431,23 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
Preconditions.checkStateNotNull(maybeDatasetService).getWriteStream(streamName);
if (writeStream != null && writeStream.hasTableSchema()) {
currentSchema.set(writeStream.getTableSchema());
+ updated.set(true);
}
}
return null;
},
tryCreateTable);
- return currentSchema.get();
+ // Note: While it may appear that these two branches are the same,
it's important to return
+ // the actual
+ // initial descriptor if the schema has not changed. Simply converting
the schema back into
+ // a descriptor isn't
+ // the same, and would break the direct-from-proto ingestion path.
+ DescriptorProtos.DescriptorProto descriptor =
+ updated.get()
+ ? TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+ currentSchema.get(), true, includeCdcColumns)
+ : initialDescriptor;
+ return new SchemaAndDescriptor(currentSchema.get(), descriptor);
}
AppendClientInfo getAppendClientInfo(
@@ -555,7 +589,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
TableRow failedRow =
TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(
- getAppendClientInfo(true, null).getDescriptor(),
rowBytes),
+ TableRowToStorageApiProto.wrapDescriptorProto(
+ getAppendClientInfo(true, null).getDescriptor()),
+ rowBytes),
true);
failedRowsReceiver.outputWithTimestamp(
new BigQueryStorageApiInsertError(
@@ -618,14 +654,16 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
TableRow failedRow =
TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(
-
Preconditions.checkStateNotNull(appendClientInfo).getDescriptor(),
+ TableRowToStorageApiProto.wrapDescriptorProto(
+
Preconditions.checkStateNotNull(appendClientInfo)
+ .getDescriptor()),
protoBytes),
true);
failedRowsReceiver.outputWithTimestamp(
new BigQueryStorageApiInsertError(
failedRow,
error.getRowIndexToErrorMessage().get(failedIndex)),
timestamp);
- } catch (InvalidProtocolBufferException e) {
+ } catch (Exception e) {
LOG.error("Failed to insert row and could not parse the
result!", e);
}
}
@@ -716,12 +754,14 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
TableRow row =
TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(
-
Preconditions.checkStateNotNull(appendClientInfo).getDescriptor(),
+ TableRowToStorageApiProto.wrapDescriptorProto(
+
Preconditions.checkStateNotNull(appendClientInfo)
+ .getDescriptor()),
rowBytes),
true);
org.joda.time.Instant timestamp = c.timestamps.get(i);
successfulRowsReceiver.outputWithTimestamp(row, timestamp);
- } catch (InvalidProtocolBufferException e) {
+ } catch (Exception e) {
LOG.warn("Failure parsing TableRow", e);
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index d5bd27f5efe..cc7f221e32e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -29,6 +29,7 @@ import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.IOException;
@@ -458,21 +459,28 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
Callable<AppendClientInfo> getAppendClientInfo =
() -> {
@Nullable TableSchema tableSchema;
- if (autoUpdateSchema && updatedSchema.read() != null) {
- // We've seen an updated schema, so we use that.
- tableSchema = updatedSchema.read();
+ DescriptorProtos.DescriptorProto descriptor;
+ TableSchema updatedSchemaValue = updatedSchema.read();
+ if (autoUpdateSchema && updatedSchemaValue != null) {
+ // We've seen an updated schema, so we use that instead of
querying the
+ // MessageConverter.
+ tableSchema = updatedSchemaValue;
+ descriptor =
+ TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+ tableSchema, true, false);
} else {
// Start off with the base schema. As we get notified of schema
updates, we
- // will update the
- // descriptor.
- tableSchema =
- messageConverters
- .get(element.getKey().getKey(), dynamicDestinations,
datasetService)
- .getTableSchema();
+ // will update the descriptor.
+ StorageApiDynamicDestinations.MessageConverter<?> converter =
+ messageConverters.get(
+ element.getKey().getKey(), dynamicDestinations,
datasetService);
+ tableSchema = converter.getTableSchema();
+ descriptor = converter.getDescriptor(false);
}
AppendClientInfo info =
AppendClientInfo.of(
Preconditions.checkStateNotNull(tableSchema),
+ descriptor,
// Make sure that the client is always closed in a
different thread
// to
// avoid blocking.
@@ -483,8 +491,7 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
// Remove the pin that is "owned" by the
cache.
client.unpin();
client.close();
- }),
- false)
+ }))
.withAppendClient(datasetService, getOrCreateStream,
false);
// This pin is "owned" by the cache.
Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 3a20bc764d5..c31886da614 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -30,6 +30,7 @@ import
com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Descriptors.FieldDescriptor;
@@ -216,7 +217,7 @@ public class TableRowToStorageApiProto {
return Optional.ofNullable(mode)
.map(Mode::valueOf)
.map(m -> MODE_MAP_JSON_PROTO.get(m))
- .orElse(TableFieldSchema.Mode.REQUIRED);
+ .orElse(TableFieldSchema.Mode.NULLABLE);
}
public static String protoModeToJsonMode(TableFieldSchema.Mode protoMode) {
@@ -302,16 +303,14 @@ public class TableRowToStorageApiProto {
public static TableFieldSchema tableFieldToProtoTableField(
com.google.api.services.bigquery.model.TableFieldSchema field) {
TableFieldSchema.Builder builder = TableFieldSchema.newBuilder();
- builder.setName(field.getName());
+ builder.setName(field.getName().toLowerCase());
if (field.getDescription() != null) {
builder.setDescription(field.getDescription());
}
if (field.getMaxLength() != null) {
builder.setMaxLength(field.getMaxLength());
}
- if (field.getMode() != null) {
- builder.setMode(modeToProtoMode(field.getMode()));
- }
+ builder.setMode(modeToProtoMode(field.getMode()));
if (field.getPrecision() != null) {
builder.setPrecision(field.getPrecision());
}
@@ -395,7 +394,7 @@ public class TableRowToStorageApiProto {
}
}
- static final Map<TableFieldSchema.Type, Type> PRIMITIVE_TYPES =
+ static final Map<TableFieldSchema.Type, Type> PRIMITIVE_TYPES_BQ_TO_PROTO =
ImmutableMap.<TableFieldSchema.Type, Type>builder()
.put(TableFieldSchema.Type.INT64, Type.TYPE_INT64)
.put(TableFieldSchema.Type.DOUBLE, Type.TYPE_DOUBLE)
@@ -412,6 +411,26 @@ public class TableRowToStorageApiProto {
.put(TableFieldSchema.Type.JSON, Type.TYPE_STRING)
.build();
+ static final Map<Descriptors.FieldDescriptor.Type, TableFieldSchema.Type>
+ PRIMITIVE_TYPES_PROTO_TO_BQ =
+ ImmutableMap.<Descriptors.FieldDescriptor.Type,
TableFieldSchema.Type>builder()
+ .put(Descriptors.FieldDescriptor.Type.INT32,
TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.FIXED32, TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.UINT32, TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.SFIXED32, TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.SINT32, TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.INT64, TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.FIXED64, TableFieldSchema.Type.NUMERIC)
+ .put(FieldDescriptor.Type.UINT64, TableFieldSchema.Type.NUMERIC)
+ .put(FieldDescriptor.Type.SFIXED64, TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.SINT64, TableFieldSchema.Type.INT64)
+ .put(FieldDescriptor.Type.DOUBLE, TableFieldSchema.Type.DOUBLE)
+ .put(FieldDescriptor.Type.FLOAT, TableFieldSchema.Type.DOUBLE)
+ .put(FieldDescriptor.Type.STRING, TableFieldSchema.Type.STRING)
+ .put(FieldDescriptor.Type.BOOL, TableFieldSchema.Type.BOOL)
+ .put(FieldDescriptor.Type.BYTES, TableFieldSchema.Type.BYTES)
+ .build();
+
public static Descriptor getDescriptorFromTableSchema(
com.google.api.services.bigquery.model.TableSchema jsonSchema,
boolean respectRequired,
@@ -428,8 +447,12 @@ public class TableRowToStorageApiProto {
public static Descriptor getDescriptorFromTableSchema(
TableSchema tableSchema, boolean respectRequired, boolean
includeCdcColumns)
throws DescriptorValidationException {
- DescriptorProto descriptorProto =
- descriptorSchemaFromTableSchema(tableSchema, respectRequired,
includeCdcColumns);
+ return wrapDescriptorProto(
+ descriptorSchemaFromTableSchema(tableSchema, respectRequired,
includeCdcColumns));
+ }
+
+ public static Descriptor wrapDescriptorProto(DescriptorProto descriptorProto)
+ throws DescriptorValidationException {
FileDescriptorProto fileDescriptorProto =
FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
FileDescriptor fileDescriptor =
@@ -633,6 +656,44 @@ public class TableRowToStorageApiProto {
}
}
+ static TableSchema tableSchemaFromDescriptor(Descriptor descriptor) {
+ List<TableFieldSchema> tableFields =
+ descriptor.getFields().stream()
+ .map(f -> tableFieldSchemaFromDescriptorField(f))
+ .collect(toList());
+ return TableSchema.newBuilder().addAllFields(tableFields).build();
+ }
+
+ static TableFieldSchema tableFieldSchemaFromDescriptorField(FieldDescriptor
fieldDescriptor) {
+ TableFieldSchema.Builder tableFieldSchemaBuilder =
TableFieldSchema.newBuilder();
+ tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setName(fieldDescriptor.getName());
+
+ switch (fieldDescriptor.getType()) {
+ case MESSAGE:
+ tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
+ TableSchema nestedTableField =
tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
+ tableFieldSchemaBuilder =
+
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
+ break;
+ default:
+ TableFieldSchema.Type type =
PRIMITIVE_TYPES_PROTO_TO_BQ.get(fieldDescriptor.getType());
+ if (type == null) {
+ throw new UnsupportedOperationException(
+ "proto type " + fieldDescriptor.getType() + " is unsupported.");
+ }
+ tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(type);
+ }
+
+ if (fieldDescriptor.isRepeated()) {
+ tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setMode(TableFieldSchema.Mode.REPEATED);
+ } else if (fieldDescriptor.isRequired()) {
+ tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setMode(TableFieldSchema.Mode.REQUIRED);
+ } else {
+ tableFieldSchemaBuilder =
tableFieldSchemaBuilder.setMode(TableFieldSchema.Mode.NULLABLE);
+ }
+ return tableFieldSchemaBuilder.build();
+ }
+
@VisibleForTesting
static DescriptorProto descriptorSchemaFromTableSchema(
com.google.api.services.bigquery.model.TableSchema tableSchema,
@@ -700,7 +761,7 @@ public class TableRowToStorageApiProto {
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
break;
default:
- @Nullable Type type = PRIMITIVE_TYPES.get(fieldSchema.getType());
+ @Nullable Type type =
PRIMITIVE_TYPES_BQ_TO_PROTO.get(fieldSchema.getType());
if (type == null) {
throw new UnsupportedOperationException(
"Converting BigQuery type " + fieldSchema.getType() + " to Beam
type is unsupported");
@@ -710,9 +771,9 @@ public class TableRowToStorageApiProto {
if (fieldSchema.getMode() == TableFieldSchema.Mode.REPEATED) {
fieldDescriptorBuilder =
fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
- } else if (!respectRequired || fieldSchema.getMode() ==
TableFieldSchema.Mode.NULLABLE) {
+ } else if (!respectRequired || fieldSchema.getMode() !=
TableFieldSchema.Mode.REQUIRED) {
fieldDescriptorBuilder =
fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
- } else if (fieldSchema.getMode() == TableFieldSchema.Mode.REQUIRED) {
+ } else {
fieldDescriptorBuilder =
fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
}
descriptorBuilder.addField(fieldDescriptorBuilder.build());
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 64d828899ea..347a3513d89 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -43,6 +43,7 @@ import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
@@ -599,7 +600,8 @@ public class FakeDatasetService implements DatasetService,
Serializable {
@Override
public StreamAppendClient getStreamAppendClient(
- String streamName, Descriptor descriptor, boolean useConnectionPool) {
+ String streamName, DescriptorProtos.DescriptorProto descriptor, boolean
useConnectionPool)
+ throws Exception {
return new StreamAppendClient() {
private Descriptor protoDescriptor;
private TableSchema currentSchema;
@@ -609,7 +611,8 @@ public class FakeDatasetService implements DatasetService,
Serializable {
private boolean usedForUpdate = false;
{
- this.protoDescriptor = descriptor;
+ this.protoDescriptor =
TableRowToStorageApiProto.wrapDescriptorProto(descriptor);
+
synchronized (FakeDatasetService.class) {
Stream stream = writeStreams.get(streamName);
if (stream == null) {
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index a0e6dafdb38..10abb738f95 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -48,11 +48,13 @@ import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.auto.value.AutoValue;
+import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -88,6 +90,7 @@ import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -133,6 +136,7 @@ import
org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -150,6 +154,7 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterab
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
@@ -1105,7 +1110,7 @@ public class BigQueryIOWriteTest implements Serializable {
new TableRow()
.set("strval", "test2")
.set("longval", "2")
- .set("doubleval", 2.0D)
+ .set("doubleval", 2.0)
.set(
"instantval",
useStorageApi || useStorageApiApproximate
@@ -1182,12 +1187,12 @@ public class BigQueryIOWriteTest implements
Serializable {
new TableRow()
.set("strVal", "test_custom")
.set("longVal", "1")
- .set("doubleVal", 1.0D)
+ .set("doubleVal", 1.0)
.set("instantVal", "2019-01-01 00:00:00 UTC"),
new TableRow()
.set("strVal", "test2_custom")
.set("longVal", "2")
- .set("doubleVal", 2.0D)
+ .set("doubleVal", 2.0)
.set("instantVal", "2019-02-01 00:00:00 UTC")));
}
@@ -3004,6 +3009,106 @@ public class BigQueryIOWriteTest implements
Serializable {
containsInAnyOrder(Iterables.toArray(rows, TableRow.class)));
}
+ @Test
+ public void testWriteProtos() throws Exception {
+ BigQueryIO.Write.Method method =
+ useStreaming
+ ? (useStorageApi
+ ? (useStorageApiApproximate
+ ? Method.STORAGE_API_AT_LEAST_ONCE
+ : Method.STORAGE_WRITE_API)
+ : Method.STREAMING_INSERTS)
+ : useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS;
+ Function<Integer, Proto3SchemaMessages.Primitive> getPrimitive =
+ (Integer i) ->
+ Proto3SchemaMessages.Primitive.newBuilder()
+ .setPrimitiveDouble(i)
+ .setPrimitiveFloat(i)
+ .setPrimitiveInt32(i)
+ .setPrimitiveInt64(i)
+ .setPrimitiveUint32(i)
+ .setPrimitiveUint64(i)
+ .setPrimitiveSint32(i)
+ .setPrimitiveSint64(i)
+ .setPrimitiveFixed32(i)
+ .setPrimitiveFixed64(i)
+ .setPrimitiveBool(true)
+ .setPrimitiveString(Integer.toString(i))
+ .setPrimitiveBytes(
+
ByteString.copyFrom(Integer.toString(i).getBytes(StandardCharsets.UTF_8)))
+ .build();
+ Function<Integer, TableRow> getPrimitiveRow =
+ (Integer i) ->
+ new TableRow()
+ .set("primitive_double", Double.valueOf(i))
+ .set("primitive_float", Float.valueOf(i).doubleValue())
+ .set("primitive_int32", i.intValue())
+ .set("primitive_int64", i.toString())
+ .set("primitive_uint32", i.toString())
+ .set("primitive_uint64", i.toString())
+ .set("primitive_sint32", i.toString())
+ .set("primitive_sint64", i.toString())
+ .set("primitive_fixed32", i.toString())
+ .set("primitive_fixed64", i.toString())
+ .set("primitive_bool", true)
+ .set("primitive_string", i.toString())
+ .set(
+ "primitive_bytes",
+ BaseEncoding.base64()
+ .encode(
+
ByteString.copyFrom(i.toString().getBytes(StandardCharsets.UTF_8))
+ .toByteArray()));
+
+ List<Proto3SchemaMessages.Primitive> nestedItems =
+ Lists.newArrayList(getPrimitive.apply(1), getPrimitive.apply(2),
getPrimitive.apply(3));
+
+ Iterable<Proto3SchemaMessages.Nested> items =
+ nestedItems.stream()
+ .map(
+ p ->
+ Proto3SchemaMessages.Nested.newBuilder()
+ .setNested(p)
+ .addAllNestedList(Lists.newArrayList(p, p, p))
+ .build())
+ .collect(Collectors.toList());
+
+ List<TableRow> expectedNestedTableRows =
+ Lists.newArrayList(
+ getPrimitiveRow.apply(1), getPrimitiveRow.apply(2),
getPrimitiveRow.apply(3));
+ Iterable<TableRow> expectedItems =
+ expectedNestedTableRows.stream()
+ .map(
+ p ->
+ new TableRow().set("nested", p).set("nested_list",
Lists.newArrayList(p, p, p)))
+ .collect(Collectors.toList());
+
+ BigQueryIO.Write<Proto3SchemaMessages.Nested> write =
+ BigQueryIO.writeProtos(Proto3SchemaMessages.Nested.class)
+ .to("dataset-id.table-id")
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withMethod(method)
+ .withoutValidation()
+ .withTestServices(fakeBqServices);
+
+ p.apply(Create.of(items)).apply("WriteToBQ", write);
+ p.run();
+
+ // Round trip through the coder to make sure the types match our expected
types.
+ List<TableRow> allRows =
+ fakeDatasetService.getAllRows("project-id", "dataset-id",
"table-id").stream()
+ .map(
+ tr -> {
+ try {
+ byte[] bytes =
CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), tr);
+ return
CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), bytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertThat(allRows, containsInAnyOrder(Iterables.toArray(expectedItems,
TableRow.class)));
+ }
+
@Test
public void testUpsertAndDeleteTableRows() throws Exception {
assumeTrue(useStorageApi);
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java
new file mode 100644
index 00000000000..93bc4162409
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDirectWriteProtosIT.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** An example that exports nested BigQuery record to a file. */
+@RunWith(Parameterized.class)
+public class StorageApiDirectWriteProtosIT {
+ @Parameterized.Parameters
+ public static Iterable<Object[]> data() {
+ return ImmutableList.of(
+ new Object[] {true, false, false},
+ new Object[] {false, true, false},
+ new Object[] {false, false, true},
+ new Object[] {true, false, true});
+ }
+
+ @Parameterized.Parameter(0)
+ public boolean useStreamingExactlyOnce;
+
+ @Parameterized.Parameter(1)
+ public boolean useAtLeastOnce;
+
+ @Parameterized.Parameter(2)
+ public boolean useBatch;
+
+ private static final BigqueryClient BQ_CLIENT =
+ new BigqueryClient("StorageApiDirectWriteProtosIT");
+ private static final String PROJECT =
+ TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+ private static final String BIG_QUERY_DATASET_ID =
+ "storage_api_sink_direct_write_protos" + System.nanoTime();
+
+ private BigQueryIO.Write.Method getMethod() {
+ return useAtLeastOnce
+ ? BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE
+ : BigQueryIO.Write.Method.STORAGE_WRITE_API;
+ }
+
+ @BeforeClass
+ public static void setUpTestEnvironment() throws IOException,
InterruptedException {
+ // Create one BQ dataset for all test cases.
+ BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
+ }
+
+ @Test
+ public void testDirectWriteProtos() throws Exception {
+ Function<Integer, Proto3SchemaMessages.Primitive> getPrimitiveProto =
+ (Integer i) ->
+ Proto3SchemaMessages.Primitive.newBuilder()
+ .setPrimitiveDouble(i)
+ .setPrimitiveFloat(i)
+ .setPrimitiveInt32(i)
+ .setPrimitiveInt64(i)
+ .setPrimitiveUint32(i)
+ .setPrimitiveUint64(i)
+ .setPrimitiveSint32(i)
+ .setPrimitiveSint64(i)
+ .setPrimitiveFixed32(i)
+ .setPrimitiveFixed64(i)
+ .setPrimitiveBool(true)
+ .setPrimitiveString(Integer.toString(i))
+ .setPrimitiveBytes(
+
ByteString.copyFrom(Integer.toString(i).getBytes(StandardCharsets.UTF_8)))
+ .build();
+ Function<Integer, TableRow> getPrimitiveRow =
+ (Integer i) ->
+ new TableRow()
+ .set("primitive_double", Double.valueOf(i))
+ .set("primitive_float", Float.valueOf(i).doubleValue())
+ .set("primitive_int32", i.toString())
+ .set("primitive_int64", i.toString())
+ .set("primitive_uint32", i.toString())
+ .set("primitive_uint64", i.toString())
+ .set("primitive_sint32", i.toString())
+ .set("primitive_sint64", i.toString())
+ .set("primitive_fixed32", i.toString())
+ .set("primitive_fixed64", i.toString())
+ .set("primitive_bool", true)
+ .set("primitive_string", i.toString())
+ .set(
+ "primitive_bytes",
+ BaseEncoding.base64()
+ .encode(
+
ByteString.copyFrom(i.toString().getBytes(StandardCharsets.UTF_8))
+ .toByteArray()));
+
+ List<Proto3SchemaMessages.Primitive> nestedItems =
+ IntStream.range(1, 2)
+ .mapToObj(i -> getPrimitiveProto.apply(i))
+ .collect(Collectors.toList());
+
+ Iterable<Proto3SchemaMessages.Nested> items =
+ nestedItems.stream()
+ .map(
+ p ->
+ Proto3SchemaMessages.Nested.newBuilder()
+ .setNested(p)
+ .addAllNestedList(Lists.newArrayList(p, p, p))
+ .build())
+ .collect(Collectors.toList());
+
+ List<TableRow> expectedNestedItems =
+ IntStream.range(1,
2).mapToObj(getPrimitiveRow::apply).collect(Collectors.toList());
+
+ Iterable<TableRow> expectedItems =
+ expectedNestedItems.stream()
+ .map(
+ p ->
+ new TableRow()
+ .set("nested_map", Lists.newArrayList())
+ .set("nested", p)
+ .set("nested_list", Lists.newArrayList(p, p, p)))
+ .collect(Collectors.toList());
+
+ String table = "table" + System.nanoTime();
+ String tableSpec = PROJECT + "." + BIG_QUERY_DATASET_ID + "." + table;
+
+ BigQueryIO.Write.Method method = getMethod();
+ BigQueryIO.Write<Proto3SchemaMessages.Nested> write =
+ BigQueryIO.writeProtos(Proto3SchemaMessages.Nested.class)
+ .to(tableSpec)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withMethod(method);
+ if (method == BigQueryIO.Write.Method.STORAGE_WRITE_API) {
+ write = write.withNumStorageWriteApiStreams(1);
+ if (useStreamingExactlyOnce) {
+ write = write.withTriggeringFrequency(Duration.standardSeconds(1));
+ }
+ }
+
+ Pipeline p = Pipeline.create();
+
+ PCollection<Proto3SchemaMessages.Nested> input = p.apply("Create test
cases", Create.of(items));
+ if (useStreamingExactlyOnce) {
+ input = input.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+ }
+ input.apply("Write using Storage Write API", write);
+ p.run().waitUntilFinish();
+ assertRowsWritten(tableSpec, expectedItems);
+ }
+
+ void assertRowsWritten(String tableSpec, Iterable<TableRow> expectedItems)
throws Exception {
+ List<TableRow> rows =
+ BQ_CLIENT.queryUnflattened(
+ String.format("SELECT * FROM %s", tableSpec), PROJECT, true, true);
+ assertThat(rows, containsInAnyOrder(Iterables.toArray(expectedItems,
TableRow.class)));
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index dc5b4b82936..aae1c2096cc 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -63,7 +63,7 @@ import org.junit.runners.JUnit4;
/** Unit tests for {@link
org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
public class TableRowToStorageApiProtoTest {
// Schemas we test.
- // The TableRow class has special semantics for fields named "f". To ensure
we handel them
+ // The TableRow class has special semantics for fields named "f". To ensure
we handle them
// properly, we test schemas
// both with and without a field named "f".
private static final TableSchema BASE_TABLE_SCHEMA =
@@ -167,7 +167,7 @@ public class TableRowToStorageApiProtoTest {
.add(new
TableFieldSchema().setType("TIMESTAMP").setName("timestampValueMaximum"))
.build());
- private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+ private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR =
DescriptorProto.newBuilder()
.addField(
FieldDescriptorProto.newBuilder()
@@ -367,6 +367,150 @@ public class TableRowToStorageApiProtoTest {
.build())
.build();
+ private static final com.google.cloud.bigquery.storage.v1.TableSchema
BASE_TABLE_PROTO_SCHEMA =
+ com.google.cloud.bigquery.storage.v1.TableSchema.newBuilder()
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("stringvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("f")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("bytesvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("int64value")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("intvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("float64value")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DOUBLE)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("floatvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DOUBLE)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("boolvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BOOL)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("booleanvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BOOL)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timevalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("datetimevalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("datevalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("numericvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("bignumericvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("numericvalue2")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("bignumericvalue2")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("arrayvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampisovalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampisovalueoffsethh")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluelong")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluespace")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluespaceutc")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluezoneregion")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluespacemilli")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluespacetrailingzero")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("datetimevaluespace")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluemaximum")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .build();
+
private static final DescriptorProto BASE_TABLE_SCHEMA_NO_F_PROTO =
DescriptorProto.newBuilder()
.addField(
@@ -559,6 +703,146 @@ public class TableRowToStorageApiProtoTest {
.setLabel(Label.LABEL_OPTIONAL)
.build())
.build();
+
+ private static final com.google.cloud.bigquery.storage.v1.TableSchema
+ BASE_TABLE_NO_F_PROTO_SCHEMA =
+ com.google.cloud.bigquery.storage.v1.TableSchema.newBuilder()
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("stringvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("bytesvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("int64value")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("intvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("float64value")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DOUBLE)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("floatvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DOUBLE)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("boolvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BOOL)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("booleanvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BOOL)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timevalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("datetimevalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("datevalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("numericvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("bignumericvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("numericvalue2")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("bignumericvalue2")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("arrayvalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BYTES)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampisovalue")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampisovalueoffsethh")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluelong")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluespace")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluespaceutc")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluezoneregion")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluespacemilli")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluespacetrailingzero")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("datetimevaluespace")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .addFields(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+ .setName("timestampvaluemaximum")
+
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.INT64)
+ .build())
+ .build();
private static final TableSchema NESTED_TABLE_SCHEMA =
new TableSchema()
.setFields(
@@ -566,29 +850,33 @@ public class TableRowToStorageApiProtoTest {
.add(
new TableFieldSchema()
.setType("STRUCT")
- .setName("nestedValue1")
+ .setName("nestedvalue1")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA.getFields()))
.add(
new TableFieldSchema()
.setType("RECORD")
- .setName("nestedValue2")
+ .setName("nestedvalue2")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA.getFields()))
.add(
new TableFieldSchema()
.setType("STRUCT")
- .setName("nestedValueNoF1")
+ .setName("nestedvaluenof1")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
.add(
new TableFieldSchema()
.setType("RECORD")
- .setName("nestedValueNoF2")
+ .setName("nestedvaluenof2")
+ .setMode("NULLABLE")
.setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
.build());
@Rule public transient ExpectedException thrown = ExpectedException.none();
@Test
- public void testDescriptorFromTableSchema() {
+ public void testDescriptorFromTableSchema() throws Exception {
DescriptorProto descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA,
true, false);
Map<String, Type> types =
@@ -596,18 +884,37 @@ public class TableRowToStorageApiProtoTest {
.collect(
Collectors.toMap(FieldDescriptorProto::getName,
FieldDescriptorProto::getType));
Map<String, Type> expectedTypes =
- BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+ BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR.getFieldList().stream()
.collect(
Collectors.toMap(FieldDescriptorProto::getName,
FieldDescriptorProto::getType));
assertEquals(expectedTypes, types);
+
+ com.google.cloud.bigquery.storage.v1.TableSchema roundtripSchema =
+ TableRowToStorageApiProto.tableSchemaFromDescriptor(
+ TableRowToStorageApiProto.wrapDescriptorProto(descriptor));
+ Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
roundTripTypes =
+ roundtripSchema.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+ Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
roundTripExpectedTypes =
+ BASE_TABLE_PROTO_SCHEMA.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+ assertEquals(roundTripExpectedTypes, roundTripTypes);
}
@Test
- public void testNestedFromTableSchema() {
+ public void testNestedFromTableSchema() throws Exception {
DescriptorProto descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA,
true, false);
Map<String, Type> expectedBaseTypes =
- BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+ BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR.getFieldList().stream()
.collect(
Collectors.toMap(FieldDescriptorProto::getName,
FieldDescriptorProto::getType));
Map<String, Type> expectedBaseTypesNoF =
@@ -659,6 +966,97 @@ public class TableRowToStorageApiProtoTest {
.collect(
Collectors.toMap(FieldDescriptorProto::getName,
FieldDescriptorProto::getType));
assertEquals(expectedBaseTypesNoF, nestedTypesNoF2);
+
+ Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
+ roundTripExpectedBaseTypes =
+ BASE_TABLE_PROTO_SCHEMA.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+ Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
+ roundTripExpectedBaseTypesNoF =
+ BASE_TABLE_NO_F_PROTO_SCHEMA.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+
+ com.google.cloud.bigquery.storage.v1.TableSchema roundtripSchema =
+ TableRowToStorageApiProto.tableSchemaFromDescriptor(
+ TableRowToStorageApiProto.wrapDescriptorProto(descriptor));
+
+ Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
roundTripTypes =
+ roundtripSchema.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+ assertEquals(4, roundTripTypes.size());
+
+ assertEquals(
+ com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT,
+ roundTripTypes.get("nestedvalue1"));
+ com.google.cloud.bigquery.storage.v1.TableFieldSchema nestedType =
+ roundtripSchema.getFieldsList().stream()
+ .filter(f -> f.getName().equals("nestedvalue1"))
+ .findFirst()
+ .get();
+ Map<String, com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type>
nestedRoundTripTypes =
+ nestedType.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+ assertEquals(roundTripExpectedBaseTypes, nestedRoundTripTypes);
+
+ assertEquals(
+ com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT,
+ roundTripTypes.get("nestedvalue2"));
+ nestedType =
+ roundtripSchema.getFieldsList().stream()
+ .filter(f -> f.getName().equals("nestedvalue2"))
+ .findFirst()
+ .get();
+ nestedRoundTripTypes =
+ nestedType.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+ assertEquals(roundTripExpectedBaseTypes, nestedRoundTripTypes);
+
+ assertEquals(
+ com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT,
+ roundTripTypes.get("nestedvaluenof1"));
+ nestedType =
+ roundtripSchema.getFieldsList().stream()
+ .filter(f -> f.getName().equals("nestedvaluenof1"))
+ .findFirst()
+ .get();
+ nestedRoundTripTypes =
+ nestedType.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+ assertEquals(roundTripExpectedBaseTypesNoF, nestedRoundTripTypes);
+
+ assertEquals(
+ com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT,
+ roundTripTypes.get("nestedvaluenof2"));
+ nestedType =
+ roundtripSchema.getFieldsList().stream()
+ .filter(f -> f.getName().equals("nestedvaluenof2"))
+ .findFirst()
+ .get();
+ nestedRoundTripTypes =
+ nestedType.getFieldsList().stream()
+ .collect(
+ Collectors.toMap(
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getName,
+
com.google.cloud.bigquery.storage.v1.TableFieldSchema::getType));
+ assertEquals(roundTripExpectedBaseTypesNoF, nestedRoundTripTypes);
}
private static final List<Object> REPEATED_BYTES =