This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 52ab49e7255 Fix BQ storage stream split (#32376)
52ab49e7255 is described below

commit 52ab49e72559a70af910bf61e33eb02bb245ee46
Author: Michel Davit <[email protected]>
AuthorDate: Fri Aug 30 16:57:14 2024 +0200

    Fix BQ storage stream split (#32376)
    
    A variable shadowing was introduced, preventing modification of the
    stream reader source after splitting.
---
 .../beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java     | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
index e583bdb4817..adc0933defe 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -347,7 +347,7 @@ class BigQueryStorageStreamSource<T> extends 
BoundedSource<T> {
       // Because superclass cannot have preconditions around these variables, 
cannot use
       // @RequiresNonNull
       Preconditions.checkStateNotNull(responseStream);
-      BigQueryServerStream<ReadRowsResponse> responseStream = 
this.responseStream;
+      final BigQueryServerStream<ReadRowsResponse> responseStream = 
this.responseStream;
       totalSplitCalls.inc();
       LOG.debug(
           "Received BigQuery Storage API split request for stream {} at 
fraction {}.",
@@ -433,9 +433,9 @@ class BigQueryStorageStreamSource<T> extends 
BoundedSource<T> {
 
         // Cancels the parent stream before replacing it with the primary 
stream.
         responseStream.cancel();
-        source = source.fromExisting(splitResponse.getPrimaryStream());
-        responseStream = newResponseStream;
-        responseIterator = newResponseIterator;
+        this.source = source.fromExisting(splitResponse.getPrimaryStream());
+        this.responseStream = newResponseStream;
+        this.responseIterator = newResponseIterator;
         reader.resetBuffer();
       }
 

Reply via email to