This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 52ab49e7255 Fix BQ storage stream split (#32376)
52ab49e7255 is described below
commit 52ab49e72559a70af910bf61e33eb02bb245ee46
Author: Michel Davit <[email protected]>
AuthorDate: Fri Aug 30 16:57:14 2024 +0200
Fix BQ storage stream split (#32376)
A variable shadowing was introduced, preventing modification of the
stream reader source after splitting.
---
.../beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
index e583bdb4817..adc0933defe 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -347,7 +347,7 @@ class BigQueryStorageStreamSource<T> extends
BoundedSource<T> {
// Because superclass cannot have preconditions around these variables,
cannot use
// @RequiresNonNull
Preconditions.checkStateNotNull(responseStream);
- BigQueryServerStream<ReadRowsResponse> responseStream =
this.responseStream;
+ final BigQueryServerStream<ReadRowsResponse> responseStream =
this.responseStream;
totalSplitCalls.inc();
LOG.debug(
"Received BigQuery Storage API split request for stream {} at
fraction {}.",
@@ -433,9 +433,9 @@ class BigQueryStorageStreamSource<T> extends
BoundedSource<T> {
// Cancels the parent stream before replacing it with the primary
stream.
responseStream.cancel();
- source = source.fromExisting(splitResponse.getPrimaryStream());
- responseStream = newResponseStream;
- responseIterator = newResponseIterator;
+ this.source = source.fromExisting(splitResponse.getPrimaryStream());
+ this.responseStream = newResponseStream;
+ this.responseIterator = newResponseIterator;
reader.resetBuffer();
}