This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 21d9f6d Merge pull request #16347: fix: move connector to use v1
BigQuery Storage Write API
21d9f6d is described below
commit 21d9f6d4ebf3e1c8b80c249fcb32e3e820eeb12b
Author: Yiru Tang <[email protected]>
AuthorDate: Wed Jan 5 16:21:23 2022 -0800
Merge pull request #16347: fix: move connector to use v1 BigQuery Storage
Write API
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 -
sdks/java/io/google-cloud-platform/build.gradle | 1 -
.../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 12 ++++----
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 35 ++++++++++++----------
.../sdk/io/gcp/bigquery/SplittingIterable.java | 2 +-
.../gcp/bigquery/StorageApiFinalizeWritesDoFn.java | 8 ++---
.../bigquery/StorageApiFlushAndFinalizeDoFn.java | 4 +--
.../bigquery/StorageApiWriteUnshardedRecords.java | 6 ++--
.../bigquery/StorageApiWritesShardedRecords.java | 6 ++--
.../sdk/io/gcp/testing/FakeDatasetService.java | 14 ++++-----
.../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java | 1 -
11 files changed, 45 insertions(+), 45 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 3479446..6538958 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -646,7 +646,6 @@ class BeamModulePlugin implements Plugin<Project> {
protobuf_java_util :
"com.google.protobuf:protobuf-java-util:$protobuf_version",
proto_google_cloud_bigquery_storage_v1 :
"com.google.api.grpc:proto-google-cloud-bigquerystorage-v1", //
google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigtable_admin_v2 :
"com.google.api.grpc:proto-google-cloud-bigtable-admin-v2", //
google_cloud_platform_libraries_bom sets version
- proto_google_cloud_bigquery_storage_v1beta2 :
"com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2", //
google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigtable_v2 :
"com.google.api.grpc:proto-google-cloud-bigtable-v2", //
google_cloud_platform_libraries_bom sets version
proto_google_cloud_datacatalog_v1beta1 :
"com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1", //
google_cloud_platform_libraries_bom sets version
proto_google_cloud_datastore_v1 :
"com.google.api.grpc:proto-google-cloud-datastore-v1", //
google_cloud_platform_libraries_bom sets version
diff --git a/sdks/java/io/google-cloud-platform/build.gradle
b/sdks/java/io/google-cloud-platform/build.gradle
index 6d7d911..efe4fb9 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -111,7 +111,6 @@ dependencies {
implementation library.java.netty_tcnative_boringssl_static
permitUnusedDeclared library.java.netty_tcnative_boringssl_static //
BEAM-11761
implementation library.java.proto_google_cloud_bigquery_storage_v1
- implementation library.java.proto_google_cloud_bigquery_storage_v1beta2
implementation library.java.proto_google_cloud_bigtable_admin_v2
implementation library.java.proto_google_cloud_bigtable_v2
implementation library.java.proto_google_cloud_datastore_v1
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 29d5c00..927b7b3 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -29,18 +29,18 @@ import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
+import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
-import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
-import
com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse;
-import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse;
-import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse;
-import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
-import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
+import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors.Descriptor;
import java.io.IOException;
import java.io.Serializable;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 5a270d4..f607598 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -58,28 +58,28 @@ import
com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
+import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
+import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
+import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
+import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
+import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
-import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
-import
com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsRequest;
-import
com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse;
-import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
-import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteSettings;
-import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest;
-import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamRequest;
-import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse;
-import com.google.cloud.bigquery.storage.v1beta2.FlushRowsRequest;
-import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse;
-import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
-import com.google.cloud.bigquery.storage.v1beta2.ProtoSchema;
-import com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2;
-import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
+import com.google.cloud.bigquery.storage.v1.StreamWriter;
+import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.protobuf.Descriptors.Descriptor;
@@ -1194,8 +1194,11 @@ class BigQueryServicesImpl implements BigQueryServices {
throws Exception {
ProtoSchema protoSchema =
ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build();
- StreamWriterV2 streamWriter =
-
StreamWriterV2.newBuilder(streamName).setWriterSchema(protoSchema).build();
+ StreamWriter streamWriter =
+ StreamWriter.newBuilder(streamName)
+ .setWriterSchema(protoSchema)
+ .setTraceId("Dataflow")
+ .build();
return new StreamAppendClient() {
private int pins = 0;
private boolean closed = false;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
index cf367f3..7152284 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
-import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.protobuf.ByteString;
import java.util.Iterator;
import java.util.NoSuchElementException;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java
index 7dd3485..2744603 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java
@@ -17,10 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
-import
com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse;
-import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse;
-import com.google.cloud.bigquery.storage.v1beta2.StorageError;
-import com.google.cloud.bigquery.storage.v1beta2.StorageError.StorageErrorCode;
+import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
+import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
+import com.google.cloud.bigquery.storage.v1.StorageError;
+import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
index e614ede..1c3686a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
-import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse;
-import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse;
+import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
+import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 21c7a46..a157236 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -20,9 +20,9 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
-import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
-import com.google.cloud.bigquery.storage.v1beta2.WriteStream.Type;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.List;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 3d6e33c..15982a3 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -21,9 +21,9 @@ import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import com.google.api.core.ApiFuture;
import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
-import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
-import com.google.cloud.bigquery.storage.v1beta2.WriteStream.Type;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.Descriptors.Descriptor;
import io.grpc.Status;
import io.grpc.Status.Code;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 239c2ad..7701874 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -29,13 +29,13 @@ import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
-import
com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse;
-import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse;
-import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse;
-import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
-import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
-import com.google.cloud.bigquery.storage.v1beta2.WriteStream.Type;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
+import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
+import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.WriteStream;
+import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import com.google.protobuf.ByteString;
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index 118210e..6e89115 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -69,7 +69,6 @@ public class GcpApiSurfaceTest {
classesInPackage("com.google.cloud"),
classesInPackage("com.google.common.collect"),
classesInPackage("com.google.cloud.bigquery.storage.v1"),
- classesInPackage("com.google.cloud.bigquery.storage.v1beta2"),
classesInPackage("com.google.cloud.bigtable.config"),
classesInPackage("com.google.iam.v1"),
classesInPackage("com.google.spanner.v1"),