dorotaao opened a new issue, #38007:
URL: https://github.com/apache/beam/issues/38007
### What happened?
## Summary
When using `BigQueryIO.read().withMethod(Method.DIRECT_READ)` to read from
an **empty BigQuery table** or a table that returns zero rows after applying
`RowRestriction`, the BigQuery Storage API server returns a `ReadSession` with
**zero streams**. `BigQueryStorageSourceBase.split()` handles this by
returning `ImmutableList.of()` (an empty list of sources).
While this works correctly for purely bounded reads, it **breaks any
pipeline that wraps this bounded source into an unbounded one** (via
`UnboundedReadFromBoundedSource`).
When `split()` returns an empty list, `UnboundedReadFromBoundedSource` falls
back to wrapping the original unsplit source directly
(`ImmutableList.of(boundedSource)`). However,
`BigQueryStorageSourceBase.createReader()` is not implemented — it
unconditionally throws `UnsupportedOperationException("BigQuery storage source
must be split before reading")`, because it is designed to only be read through
its per-stream sub-sources (`BigQueryStorageStreamSource`). This causes the
pipeline to get stuck in a loop of exceptions.
## Environment
- **Apache Beam SDK version:** 2.70.0 (reproduced; likely affects all
versions with DIRECT_READ support)
- **Runner:** Dataflow Runner v1
## Steps to Reproduce
1. Create a Beam pipeline that uses
`BigQueryIO.read().withMethod(Method.DIRECT_READ)` to read from a BigQuery
table.
2. Use this read in a streaming pipeline.
3. Ensure the target BigQuery table is **empty**, or returns zero rows after
`RowRestriction` is applied.
4. Run the pipeline.
## Observed Behavior
- The pipeline **gets stuck** and the following error appears repeatedly in
logs:
```
Caused by: java.lang.UnsupportedOperationException: BigQuery storage
source must be split before reading
org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageSourceBase.createReader(BigQueryStorageSourceBase.java:200)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageTableSource.createReader(BigQueryStorageTableSource.java:42)
org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:497)
```
## Expected Behavior
The pipeline does not stall — downstream operations proceed normally, simply
processing zero elements from this branch.
## Additional Context
### Chain of events
1. `BigQueryIO.read().withMethod(Method.DIRECT_READ)` uses
`BigQueryStorageSourceBase` as the underlying `BoundedSource`.
2. `split()` builds a `CreateReadSessionRequest` and sends it to the server.
3. The server returns a `ReadSession` with `streams_count = 0` for an empty
table.
4. `split()` returns `ImmutableList.of()`.
5. `UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter` receives
an empty list of sub-sources and falls back to wrapping the original unsplit
source directly (`ImmutableList.of(boundedSource)`). When it calls
`createReader()` on this `BigQueryStorageSourceBase`, the method throws
`UnsupportedOperationException("BigQuery storage source must be split before
reading")` because `BigQueryStorageSourceBase` does not implement
`createReader()` — it is designed to only be read through its per-stream
sub-sources.
6. The job gets stuck in a loop, repeatedly hitting this exception without
making progress. The watermark never advances and the pipeline must be manually
cancelled.
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [x] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [x] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [x] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]